From eaaf2ad60cbae1e04b9775070b71e131295c0a6a Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 12 Mar 2026 23:27:34 -0300 Subject: [PATCH] executor abstraction, graphene to strawberry --- api/graphql.py | 138 ++++-------- api/main.py | 5 +- api/schema/graphql.py | 194 ++++++++-------- docs/architecture/01c-gcp-architecture.dot | 83 +++++++ docs/architecture/01c-gcp-architecture.svg | 210 ++++++++++++++++++ docs/architecture/04-media-storage.md | 50 ++++- docs/index.html | 28 ++- modelgen/generator/__init__.py | 8 +- .../generator/{graphene.py => strawberry.py} | 118 +++++----- modelgen/types.py | 36 +-- requirements.txt | 7 +- task/executor.py | 74 ++++++ task/gcp_handler.py | 121 ++++++++++ 13 files changed, 796 insertions(+), 276 deletions(-) create mode 100644 docs/architecture/01c-gcp-architecture.dot create mode 100644 docs/architecture/01c-gcp-architecture.svg rename modelgen/generator/{graphene.py => strawberry.py} (62%) create mode 100644 task/gcp_handler.py diff --git a/api/graphql.py b/api/graphql.py index ec57f06..d3b5292 100644 --- a/api/graphql.py +++ b/api/graphql.py @@ -1,5 +1,5 @@ """ -GraphQL API using graphene, mounted on FastAPI/Starlette. +GraphQL API using strawberry, served via FastAPI. Primary API for MPR — all client interactions go through GraphQL. Uses Django ORM directly for data access. @@ -7,8 +7,11 @@ Types are generated from schema/ via modelgen — see api/schema/graphql.py. """ import os +from typing import List, Optional +from uuid import UUID -import graphene +import strawberry +from strawberry.types import Info from api.schema.graphql import ( CreateJobInput, @@ -22,7 +25,6 @@ from api.schema.graphql import ( ) from core.storage import BUCKET_IN, list_objects -# Media extensions (same as assets route) VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS @@ -33,23 +35,15 @@ MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS # --------------------------------------------------------------------------- -class Query(graphene.ObjectType): - assets = graphene.List( - MediaAssetType, - status=graphene.String(), - search=graphene.String(), - ) - asset = graphene.Field(MediaAssetType, id=graphene.UUID(required=True)) - jobs = graphene.List( - TranscodeJobType, - status=graphene.String(), - source_asset_id=graphene.UUID(), - ) - job = graphene.Field(TranscodeJobType, id=graphene.UUID(required=True)) - presets = graphene.List(TranscodePresetType) - system_status = graphene.Field(SystemStatusType) - - def resolve_assets(self, info, status=None, search=None): +@strawberry.type +class Query: + @strawberry.field + def assets( + self, + info: Info, + status: Optional[str] = None, + search: Optional[str] = None, + ) -> List[MediaAssetType]: from mpr.media_assets.models import MediaAsset qs = MediaAsset.objects.all() @@ -57,9 +51,10 @@ class Query(graphene.ObjectType): qs = qs.filter(status=status) if search: qs = qs.filter(filename__icontains=search) - return qs + return list(qs) - def resolve_asset(self, info, id): + @strawberry.field + def asset(self, info: Info, id: UUID) -> Optional[MediaAssetType]: from mpr.media_assets.models import MediaAsset try: @@ -67,7 +62,13 @@ class Query(graphene.ObjectType): except MediaAsset.DoesNotExist: return None - def resolve_jobs(self, info, status=None, source_asset_id=None): + @strawberry.field + def jobs( + self, + info: Info, + status: Optional[str] = None, + source_asset_id: Optional[UUID] = None, + ) -> List[TranscodeJobType]: from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() @@ -75,9 +76,10 @@ class Query(graphene.ObjectType): qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) - return qs + return list(qs) - def resolve_job(self, info, id): + @strawberry.field + def job(self, info: Info, id: UUID) -> Optional[TranscodeJobType]: from mpr.media_assets.models import TranscodeJob try: @@ -85,13 +87,15 @@ class Query(graphene.ObjectType): except TranscodeJob.DoesNotExist: return None - def resolve_presets(self, info): + @strawberry.field + def presets(self, info: Info) -> List[TranscodePresetType]: from mpr.media_assets.models import TranscodePreset - return TranscodePreset.objects.all() + return list(TranscodePreset.objects.all()) - def resolve_system_status(self, info): - return {"status": "ok", "version": "0.1.0"} + @strawberry.field + def system_status(self, info: Info) -> SystemStatusType: + return SystemStatusType(status="ok", version="0.1.0") # --------------------------------------------------------------------------- @@ -99,13 +103,10 @@ class Query(graphene.ObjectType): # --------------------------------------------------------------------------- -class ScanMediaFolder(graphene.Mutation): - class Arguments: - pass - - Output = ScanResultType - - def mutate(self, info): +@strawberry.type +class Mutation: + @strawberry.mutation + def scan_media_folder(self, info: Info) -> ScanResultType: from mpr.media_assets.models import MediaAsset objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) @@ -135,14 +136,8 @@ class ScanMediaFolder(graphene.Mutation): files=registered, ) - -class CreateJob(graphene.Mutation): - class Arguments: - input = CreateJobInput(required=True) - - Output = TranscodeJobType - - def mutate(self, info, input): + @strawberry.mutation + def create_job(self, info: Info, input: CreateJobInput) -> TranscodeJobType: from pathlib import Path from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset @@ -186,9 +181,8 @@ class CreateJob(graphene.Mutation): priority=input.priority or 0, ) - # Dispatch executor_mode = os.environ.get("MPR_EXECUTOR", "local") - if executor_mode == "lambda": + if executor_mode in ("lambda", "gcp"): from task.executor import get_executor get_executor().run( @@ -217,14 +211,8 @@ class CreateJob(graphene.Mutation): return job - -class CancelJob(graphene.Mutation): - class Arguments: - id = graphene.UUID(required=True) - - Output = TranscodeJobType - - def mutate(self, info, id): + @strawberry.mutation + def cancel_job(self, info: Info, id: UUID) -> TranscodeJobType: from mpr.media_assets.models import TranscodeJob try: @@ -239,14 +227,8 @@ class CancelJob(graphene.Mutation): job.save(update_fields=["status"]) return job - -class RetryJob(graphene.Mutation): - class Arguments: - id = graphene.UUID(required=True) - - Output = TranscodeJobType - - def mutate(self, info, id): + @strawberry.mutation + def retry_job(self, info: Info, id: UUID) -> TranscodeJobType: from mpr.media_assets.models import TranscodeJob try: @@ -263,15 +245,8 @@ class RetryJob(graphene.Mutation): job.save(update_fields=["status", "progress", "error_message"]) return job - -class UpdateAsset(graphene.Mutation): - class Arguments: - id = graphene.UUID(required=True) - input = UpdateAssetInput(required=True) - - Output = MediaAssetType - - def mutate(self, info, id, input): + @strawberry.mutation + def update_asset(self, info: Info, id: UUID, input: UpdateAssetInput) -> MediaAssetType: from mpr.media_assets.models import MediaAsset try: @@ -292,14 +267,8 @@ class UpdateAsset(graphene.Mutation): return asset - -class DeleteAsset(graphene.Mutation): - class Arguments: - id = graphene.UUID(required=True) - - Output = DeleteResultType - - def mutate(self, info, id): + @strawberry.mutation + def delete_asset(self, info: Info, id: UUID) -> DeleteResultType: from mpr.media_assets.models import MediaAsset try: @@ -310,17 +279,8 @@ class DeleteAsset(graphene.Mutation): raise Exception("Asset not found") -class Mutation(graphene.ObjectType): - scan_media_folder = ScanMediaFolder.Field() - create_job = CreateJob.Field() - cancel_job = CancelJob.Field() - retry_job = RetryJob.Field() - update_asset = UpdateAsset.Field() - delete_asset = DeleteAsset.Field() - - # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- -schema = graphene.Schema(query=Query, mutation=Mutation) +schema = strawberry.Schema(query=Query, mutation=Mutation) diff --git a/api/main.py b/api/main.py index 8b4e7f4..f803276 100644 --- a/api/main.py +++ b/api/main.py @@ -21,7 +21,7 @@ django.setup() from fastapi import FastAPI, Header, HTTPException from fastapi.middleware.cors import CORSMiddleware -from starlette_graphene3 import GraphQLApp, make_graphiql_handler +from strawberry.fastapi import GraphQLRouter from api.graphql import schema as graphql_schema @@ -45,7 +45,8 @@ app.add_middleware( ) # GraphQL -app.mount("/graphql", GraphQLApp(schema=graphql_schema, on_get=make_graphiql_handler())) +graphql_router = GraphQLRouter(schema=graphql_schema, graphql_ide="graphiql") +app.include_router(graphql_router, prefix="/graphql") @app.get("/") diff --git a/api/schema/graphql.py b/api/schema/graphql.py index 4a44266..48e2eb4 100644 --- a/api/schema/graphql.py +++ b/api/schema/graphql.py @@ -1,19 +1,26 @@ """ -Graphene Types - GENERATED FILE +Strawberry Types - GENERATED FILE Do not edit directly. Regenerate using modelgen. """ -import graphene +import strawberry +from enum import Enum +from typing import List, Optional +from uuid import UUID +from datetime import datetime +from strawberry.scalars import JSON -class AssetStatus(graphene.Enum): +@strawberry.enum +class AssetStatus(Enum): PENDING = "pending" READY = "ready" ERROR = "error" -class JobStatus(graphene.Enum): +@strawberry.enum +class JobStatus(Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" @@ -21,122 +28,131 @@ class JobStatus(graphene.Enum): CANCELLED = "cancelled" -class MediaAssetType(graphene.ObjectType): +@strawberry.type +class MediaAssetType: """A video/audio file registered in the system.""" - id = graphene.UUID() - filename = graphene.String() - file_path = graphene.String() - status = graphene.String() - error_message = graphene.String() - file_size = graphene.Int() - duration = graphene.Float() - video_codec = graphene.String() - audio_codec = graphene.String() - width = graphene.Int() - height = graphene.Int() - framerate = graphene.Float() - bitrate = graphene.Int() - properties = graphene.JSONString() - comments = graphene.String() - tags = graphene.List(graphene.String) - created_at = graphene.DateTime() - updated_at = graphene.DateTime() + id: Optional[UUID] = None + filename: Optional[str] = None + file_path: Optional[str] = None + status: Optional[str] = None + error_message: Optional[str] = None + file_size: Optional[int] = None + duration: Optional[float] = None + video_codec: Optional[str] = None + audio_codec: Optional[str] = None + width: Optional[int] = None + height: Optional[int] = None + framerate: Optional[float] = None + bitrate: Optional[int] = None + properties: Optional[JSON] = None + comments: Optional[str] = None + tags: Optional[List[str]] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None -class TranscodePresetType(graphene.ObjectType): +@strawberry.type +class TranscodePresetType: """A reusable transcoding configuration (like Handbrake presets).""" - id = graphene.UUID() - name = graphene.String() - description = graphene.String() - is_builtin = graphene.Boolean() - container = graphene.String() - video_codec = graphene.String() - video_bitrate = graphene.String() - video_crf = graphene.Int() - video_preset = graphene.String() - resolution = graphene.String() - framerate = graphene.Float() - audio_codec = graphene.String() - audio_bitrate = graphene.String() - audio_channels = graphene.Int() - audio_samplerate = graphene.Int() - extra_args = graphene.List(graphene.String) - created_at = graphene.DateTime() - updated_at = graphene.DateTime() + id: Optional[UUID] = None + name: Optional[str] = None + description: Optional[str] = None + is_builtin: Optional[bool] = None + container: Optional[str] = None + video_codec: Optional[str] = None + video_bitrate: Optional[str] = None + video_crf: Optional[int] = None + video_preset: Optional[str] = None + resolution: Optional[str] = None + framerate: Optional[float] = None + audio_codec: Optional[str] = None + audio_bitrate: Optional[str] = None + audio_channels: Optional[int] = None + audio_samplerate: Optional[int] = None + extra_args: Optional[List[str]] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None -class TranscodeJobType(graphene.ObjectType): +@strawberry.type +class TranscodeJobType: """A transcoding or trimming job in the queue.""" - id = graphene.UUID() - source_asset_id = graphene.UUID() - preset_id = graphene.UUID() - preset_snapshot = graphene.JSONString() - trim_start = graphene.Float() - trim_end = graphene.Float() - output_filename = graphene.String() - output_path = graphene.String() - output_asset_id = graphene.UUID() - status = graphene.String() - progress = graphene.Float() - current_frame = graphene.Int() - current_time = graphene.Float() - speed = graphene.String() - error_message = graphene.String() - celery_task_id = graphene.String() - execution_arn = graphene.String() - priority = graphene.Int() - created_at = graphene.DateTime() - started_at = graphene.DateTime() - completed_at = graphene.DateTime() + id: Optional[UUID] = None + source_asset_id: Optional[UUID] = None + preset_id: Optional[UUID] = None + preset_snapshot: Optional[JSON] = None + trim_start: Optional[float] = None + trim_end: Optional[float] = None + output_filename: Optional[str] = None + output_path: Optional[str] = None + output_asset_id: Optional[UUID] = None + status: Optional[str] = None + progress: Optional[float] = None + current_frame: Optional[int] = None + current_time: Optional[float] = None + speed: Optional[str] = None + error_message: Optional[str] = None + celery_task_id: Optional[str] = None + execution_arn: Optional[str] = None + priority: Optional[int] = None + created_at: Optional[datetime] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None -class CreateJobInput(graphene.InputObjectType): +@strawberry.input +class CreateJobInput: """Request body for creating a transcode/trim job.""" - source_asset_id = graphene.UUID(required=True) - preset_id = graphene.UUID() - trim_start = graphene.Float() - trim_end = graphene.Float() - output_filename = graphene.String() - priority = graphene.Int(default_value=0) + source_asset_id: UUID + preset_id: Optional[UUID] = None + trim_start: Optional[float] = None + trim_end: Optional[float] = None + output_filename: Optional[str] = None + priority: int = 0 -class UpdateAssetInput(graphene.InputObjectType): +@strawberry.input +class UpdateAssetInput: """Request body for updating asset metadata.""" - comments = graphene.String() - tags = graphene.List(graphene.String) + comments: Optional[str] = None + tags: Optional[List[str]] = None -class SystemStatusType(graphene.ObjectType): +@strawberry.type +class SystemStatusType: """System status response.""" - status = graphene.String() - version = graphene.String() + status: Optional[str] = None + version: Optional[str] = None -class ScanResultType(graphene.ObjectType): +@strawberry.type +class ScanResultType: """Result of scanning the media input bucket.""" - found = graphene.Int() - registered = graphene.Int() - skipped = graphene.Int() - files = graphene.List(graphene.String) + found: Optional[int] = None + registered: Optional[int] = None + skipped: Optional[int] = None + files: Optional[List[str]] = None -class DeleteResultType(graphene.ObjectType): +@strawberry.type +class DeleteResultType: """Result of a delete operation.""" - ok = graphene.Boolean() + ok: Optional[bool] = None -class WorkerStatusType(graphene.ObjectType): +@strawberry.type +class WorkerStatusType: """Worker health and capabilities.""" - available = graphene.Boolean() - active_jobs = graphene.Int() - supported_codecs = graphene.List(graphene.String) - gpu_available = graphene.Boolean() + available: Optional[bool] = None + active_jobs: Optional[int] = None + supported_codecs: Optional[List[str]] = None + gpu_available: Optional[bool] = None diff --git a/docs/architecture/01c-gcp-architecture.dot b/docs/architecture/01c-gcp-architecture.dot new file mode 100644 index 0000000..8fc866c --- /dev/null +++ b/docs/architecture/01c-gcp-architecture.dot @@ -0,0 +1,83 @@ +digraph gcp_architecture { + rankdir=TB + node [shape=box, style=rounded, fontname="Helvetica"] + edge [fontname="Helvetica", fontsize=10] + + labelloc="t" + label="MPR - GCP Architecture (Cloud Run Jobs + GCS)" + fontsize=16 + fontname="Helvetica-Bold" + + graph [splines=ortho, nodesep=0.8, ranksep=0.8] + + // External + subgraph cluster_external { + label="External" + style=dashed + color=gray + + browser [label="Browser\nmpr.mcrn.ar", shape=ellipse] + } + + // Nginx reverse proxy + subgraph cluster_proxy { + label="Reverse Proxy" + style=filled + fillcolor="#e8f4f8" + + nginx [label="nginx\nport 80"] + } + + // Application layer + subgraph cluster_apps { + label="Application Layer" + style=filled + fillcolor="#f0f8e8" + + django [label="Django Admin\n/admin\nport 8701"] + fastapi [label="GraphQL API\n/graphql\nport 8702"] + timeline [label="Timeline UI\n/\nport 5173"] + } + + // Data layer (still local) + subgraph cluster_data { + label="Data Layer" + style=filled + fillcolor="#f8e8f0" + + postgres [label="PostgreSQL\nport 5436", shape=cylinder] + } + + // GCP layer + subgraph cluster_gcp { + label="Google Cloud" + style=filled + fillcolor="#e8f0fd" + + cloud_run_job [label="Cloud Run Job\nFFmpeg container\ntranscoding"] + gcs [label="GCS Buckets\n(S3-compat API)", shape=folder] + bucket_in [label="mpr-media-in/\ninput videos", shape=note] + bucket_out [label="mpr-media-out/\ntranscoded output", shape=note] + } + + // Connections + browser -> nginx [label="HTTP"] + + nginx -> django [xlabel="/admin"] + nginx -> fastapi [xlabel="/graphql"] + nginx -> timeline [xlabel="/"] + + timeline -> fastapi [label="GraphQL"] + django -> postgres + + fastapi -> postgres [label="read/write jobs"] + fastapi -> cloud_run_job [label="google-cloud-run\nrun_job() + payload\nexecution_name"] + + cloud_run_job -> gcs [label="S3 compat (HMAC)\ndownload input\nupload output"] + cloud_run_job -> fastapi [label="POST /jobs/{id}/callback\nupdate status"] + + fastapi -> postgres [label="callback updates\njob status"] + + gcs -> bucket_in [style=dotted, arrowhead=none] + gcs -> bucket_out [style=dotted, arrowhead=none] +} diff --git a/docs/architecture/01c-gcp-architecture.svg b/docs/architecture/01c-gcp-architecture.svg new file mode 100644 index 0000000..9f24e4d --- /dev/null +++ b/docs/architecture/01c-gcp-architecture.svg @@ -0,0 +1,210 @@ + + + + + + +gcp_architecture + +MPR - GCP Architecture (Cloud Run Jobs + GCS) + +cluster_external + +External + + +cluster_proxy + +Reverse Proxy + + +cluster_apps + +Application Layer + + +cluster_data + +Data Layer + + +cluster_gcp + +Google Cloud + + + +browser + +Browser +mpr.mcrn.ar + + + +nginx + +nginx +port 80 + + + +browser->nginx + + +HTTP + + + +django + +Django Admin +/admin +port 8701 + + + +nginx->django + + +/admin + + + +fastapi + +GraphQL API +/graphql +port 8702 + + + +nginx->fastapi + + +/graphql + + + +timeline + +Timeline UI +/ +port 5173 + + + +nginx->timeline + + +/ + + + +postgres + + +PostgreSQL +port 5436 + + + +django->postgres + + + + + +fastapi->postgres + + +read/write jobs + + + +fastapi->postgres + + +callback updates +job status + + + +cloud_run_job + +Cloud Run Job +FFmpeg container +transcoding + + + +fastapi->cloud_run_job + + +google-cloud-run +run_job() + payload +execution_name + + + +timeline->fastapi + + +GraphQL + + + +cloud_run_job->fastapi + + +POST /jobs/{id}/callback +update status + + + +gcs + +GCS Buckets +(S3-compat API) + + + +cloud_run_job->gcs + + +S3 compat (HMAC) +download input +upload output + + + +bucket_in + + + +mpr-media-in/ +input videos + + + +gcs->bucket_in + + + + +bucket_out + + + +mpr-media-out/ +transcoded output + + + +gcs->bucket_out + + + + diff --git a/docs/architecture/04-media-storage.md b/docs/architecture/04-media-storage.md index a410c6b..89bf9df 100644 --- a/docs/architecture/04-media-storage.md +++ b/docs/architecture/04-media-storage.md @@ -68,6 +68,47 @@ aws s3 cp video.mp4 s3://mpr-media-in/ aws s3 sync /local/media/ s3://mpr-media-in/ ``` +## GCP Production (GCS via S3 compatibility) + +GCS exposes an S3-compatible API. The same `core/storage.py` boto3 code works +with no changes — only the endpoint and credentials differ. + +### GCS HMAC Keys +Generate under **Cloud Storage → Settings → Interoperability** in the GCP console. +These act as `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`. + +### Configuration +```bash +S3_ENDPOINT_URL=https://storage.googleapis.com +S3_BUCKET_IN=mpr-media-in +S3_BUCKET_OUT=mpr-media-out +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= + +# Executor +MPR_EXECUTOR=gcp +GCP_PROJECT_ID=my-project +GCP_REGION=us-central1 +CLOUD_RUN_JOB=mpr-transcode +CALLBACK_URL=https://mpr.mcrn.ar/api +CALLBACK_API_KEY= +``` + +### Upload Files to GCS +```bash +gcloud storage cp video.mp4 gs://mpr-media-in/ + +# Or with the aws CLI via compat endpoint +aws --endpoint-url https://storage.googleapis.com s3 cp video.mp4 s3://mpr-media-in/ +``` + +### Cloud Run Job Handler +`task/gcp_handler.py` is the Cloud Run Job entrypoint. It reads the job payload +from `MPR_JOB_PAYLOAD` (injected by `GCPExecutor`), uses `core/storage` for all +GCS access (S3 compat), and POSTs the completion callback to the API. + +Set the Cloud Run Job command to: `python -m task.gcp_handler` + ## Storage Module `core/storage.py` provides all S3 operations: @@ -114,7 +155,14 @@ mutation { scanMediaFolder { found registered skipped files } } 4. Uploads result to `S3_BUCKET_OUT` 5. Calls back to API with result -Both paths use the same S3 buckets and key structure. +### Cloud Run Job Mode (GCP) +1. `GCPExecutor` triggers Cloud Run Job with payload in `MPR_JOB_PAYLOAD` +2. `task/gcp_handler.py` downloads source from `S3_BUCKET_IN` (GCS S3 compat) +3. Runs FFmpeg in container +4. Uploads result to `S3_BUCKET_OUT` (GCS S3 compat) +5. Calls back to API with result + +All three paths use the same S3-compatible bucket names and key structure. ## Supported File Types diff --git a/docs/index.html b/docs/index.html index f72e35e..7b5f774 100644 --- a/docs/index.html +++ b/docs/index.html @@ -9,8 +9,9 @@

MPR - Media Processor

- Media transcoding platform with dual execution modes: local (Celery - + MinIO) and cloud (AWS Step Functions + Lambda + S3). + Media transcoding platform with three execution modes: local (Celery + + MinIO), AWS (Step Functions + Lambda + S3), and GCP (Cloud Run + Jobs + GCS). Storage is S3-compatible across all environments.