Compare commits

..

2 Commits

Author SHA1 Message Date
3eeedebb15 major refactor 2026-03-13 01:07:02 -03:00
eaaf2ad60c executor abstraction, graphene to strawberry 2026-03-12 23:27:34 -03:00
69 changed files with 1457 additions and 738 deletions

View File

@@ -71,12 +71,12 @@ docker compose logs -f
docker compose logs -f celery
# Create admin user
docker compose exec django python manage.py createsuperuser
docker compose exec django python admin/manage.py createsuperuser
```
## Code Generation
Models are defined as dataclasses in `schema/models/` and generated via `modelgen`:
Models are defined as dataclasses in `core/schema/models/` and generated via `modelgen`:
- **Django ORM** models (`--include dataclasses,enums`)
- **Pydantic** schemas (`--include dataclasses,enums`)
- **TypeScript** types (`--include dataclasses,enums,api`)
@@ -113,26 +113,29 @@ See [docs/media-storage.md](docs/media-storage.md) for full details.
```
mpr/
├── api/ # FastAPI application
│ ├── routes/ # API endpoints
│ └── schemas/ # Pydantic models (generated)
├── core/ # Core utilities
│ └── ffmpeg/ # FFmpeg wrappers
├── admin/ # Django project
│ ├── manage.py # Django management script
│ └── mpr/ # Django settings & app
│ └── media_assets/# Django app
├── core/ # Core application logic
│ ├── api/ # FastAPI + GraphQL API
│ │ └── schema/ # GraphQL types (generated)
│ ├── ffmpeg/ # FFmpeg wrappers
│ ├── rpc/ # gRPC server & client
│ │ └── protos/ # Protobuf definitions (generated)
│ ├── schema/ # Source of truth
│ │ └── models/ # Dataclass definitions
│ ├── storage/ # S3/GCP/local storage backends
│ └── task/ # Celery job execution
│ ├── executor.py # Executor abstraction
│ └── tasks.py # Celery tasks
├── ctrl/ # Docker & deployment
│ ├── docker-compose.yml
│ └── nginx.conf
├── media/
│ ├── in/ # Source media files
│ └── out/ # Transcoded output
├── rpc/ # gRPC server & client
│ └── protos/ # Protobuf definitions (generated)
├── mpr/ # Django project
│ └── media_assets/ # Django app
├── schema/ # Source of truth
│ └── models/ # Dataclass definitions
├── task/ # Celery job execution
│ ├── executor.py # Executor abstraction
│ └── tasks.py # Celery tasks
├── modelgen/ # Code generation tool
└── ui/ # Frontend
└── timeline/ # React app
```

View File

@@ -6,7 +6,9 @@ import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mpr.settings')
# Ensure project root is on sys.path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'admin.mpr.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:

View File

@@ -11,6 +11,6 @@ import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mpr.settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'admin.mpr.settings')
application = get_asgi_application()

View File

@@ -2,9 +2,9 @@ import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mpr.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "admin.mpr.settings")
app = Celery("mpr")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.autodiscover_tasks(["task"])
app.autodiscover_tasks(["core.task"])

View File

@@ -3,5 +3,6 @@ from django.apps import AppConfig
class MediaAssetsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "mpr.media_assets"
name = "admin.mpr.media_assets"
label = "media_assets"
verbose_name = "Media Assets"

View File

@@ -4,10 +4,10 @@ from pathlib import Path
from django.core.management.base import BaseCommand
from mpr.media_assets.models import TranscodePreset
from admin.mpr.media_assets.models import TranscodePreset
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent.parent))
from schema.models import BUILTIN_PRESETS
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent.parent.parent.parent))
from core.schema.models import BUILTIN_PRESETS
class Command(BaseCommand):

View File

@@ -1,8 +1,7 @@
# Generated by Django 6.0.1 on 2026-02-01 15:13
# Generated by Django 4.2.29 on 2026-03-13 04:04
import django.db.models.deletion
import uuid
from django.db import migrations, models
import uuid
class Migration(migrations.Migration):
@@ -13,47 +12,21 @@ class Migration(migrations.Migration):
]
operations = [
migrations.CreateModel(
name='TranscodePreset',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=100, unique=True)),
('description', models.TextField(blank=True, default='')),
('is_builtin', models.BooleanField(default=False)),
('container', models.CharField(default='mp4', max_length=20)),
('video_codec', models.CharField(default='libx264', max_length=50)),
('video_bitrate', models.CharField(blank=True, max_length=20, null=True)),
('video_crf', models.IntegerField(blank=True, null=True)),
('video_preset', models.CharField(blank=True, max_length=20, null=True)),
('resolution', models.CharField(blank=True, max_length=20, null=True)),
('framerate', models.FloatField(blank=True, null=True)),
('audio_codec', models.CharField(default='aac', max_length=50)),
('audio_bitrate', models.CharField(blank=True, max_length=20, null=True)),
('audio_channels', models.IntegerField(blank=True, null=True)),
('audio_samplerate', models.IntegerField(blank=True, null=True)),
('extra_args', models.JSONField(blank=True, default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'ordering': ['name'],
},
),
migrations.CreateModel(
name='MediaAsset',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('filename', models.CharField(max_length=500)),
('file_path', models.CharField(max_length=1000)),
('status', models.CharField(choices=[('pending', 'Pending Probe'), ('ready', 'Ready'), ('error', 'Error')], default='pending', max_length=20)),
('error_message', models.TextField(blank=True, null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('ready', 'Ready'), ('error', 'Error')], default='pending', max_length=20)),
('error_message', models.TextField(blank=True, default='')),
('file_size', models.BigIntegerField(blank=True, null=True)),
('duration', models.FloatField(blank=True, null=True)),
('video_codec', models.CharField(blank=True, max_length=50, null=True)),
('audio_codec', models.CharField(blank=True, max_length=50, null=True)),
('width', models.IntegerField(blank=True, null=True)),
('height', models.IntegerField(blank=True, null=True)),
('framerate', models.FloatField(blank=True, null=True)),
('duration', models.FloatField(blank=True, default=None, null=True)),
('video_codec', models.CharField(blank=True, max_length=255, null=True)),
('audio_codec', models.CharField(blank=True, max_length=255, null=True)),
('width', models.IntegerField(blank=True, default=None, null=True)),
('height', models.IntegerField(blank=True, default=None, null=True)),
('framerate', models.FloatField(blank=True, default=None, null=True)),
('bitrate', models.BigIntegerField(blank=True, null=True)),
('properties', models.JSONField(blank=True, default=dict)),
('comments', models.TextField(blank=True, default='')),
@@ -63,36 +36,61 @@ class Migration(migrations.Migration):
],
options={
'ordering': ['-created_at'],
'indexes': [models.Index(fields=['status'], name='media_asset_status_9ea2f2_idx'), models.Index(fields=['created_at'], name='media_asset_created_368039_idx')],
},
),
migrations.CreateModel(
name='TranscodeJob',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('source_asset_id', models.UUIDField()),
('preset_id', models.UUIDField(blank=True, null=True)),
('preset_snapshot', models.JSONField(blank=True, default=dict)),
('trim_start', models.FloatField(blank=True, null=True)),
('trim_end', models.FloatField(blank=True, null=True)),
('trim_start', models.FloatField(blank=True, default=None, null=True)),
('trim_end', models.FloatField(blank=True, default=None, null=True)),
('output_filename', models.CharField(max_length=500)),
('output_path', models.CharField(blank=True, max_length=1000, null=True)),
('output_asset_id', models.UUIDField(blank=True, null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('processing', 'Processing'), ('completed', 'Completed'), ('failed', 'Failed'), ('cancelled', 'Cancelled')], default='pending', max_length=20)),
('progress', models.FloatField(default=0.0)),
('current_frame', models.IntegerField(blank=True, null=True)),
('current_time', models.FloatField(blank=True, null=True)),
('speed', models.CharField(blank=True, max_length=20, null=True)),
('error_message', models.TextField(blank=True, null=True)),
('celery_task_id', models.CharField(blank=True, max_length=100, null=True)),
('current_frame', models.IntegerField(blank=True, default=None, null=True)),
('current_time', models.FloatField(blank=True, default=None, null=True)),
('speed', models.CharField(blank=True, max_length=255, null=True)),
('error_message', models.TextField(blank=True, default='')),
('celery_task_id', models.CharField(blank=True, max_length=255, null=True)),
('execution_arn', models.CharField(blank=True, max_length=255, null=True)),
('priority', models.IntegerField(default=0)),
('created_at', models.DateTimeField(auto_now_add=True)),
('started_at', models.DateTimeField(blank=True, null=True)),
('completed_at', models.DateTimeField(blank=True, null=True)),
('output_asset', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='source_jobs', to='media_assets.mediaasset')),
('source_asset', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='transcode_jobs', to='media_assets.mediaasset')),
('preset', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='jobs', to='media_assets.transcodepreset')),
],
options={
'ordering': ['priority', 'created_at'],
'indexes': [models.Index(fields=['status', 'priority'], name='media_asset_status_e6ac18_idx'), models.Index(fields=['created_at'], name='media_asset_created_ba3a46_idx'), models.Index(fields=['celery_task_id'], name='media_asset_celery__81a88e_idx')],
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='TranscodePreset',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('description', models.TextField(blank=True, default='')),
('is_builtin', models.BooleanField(default=False)),
('container', models.CharField(max_length=255)),
('video_codec', models.CharField(max_length=255)),
('video_bitrate', models.CharField(blank=True, max_length=255, null=True)),
('video_crf', models.IntegerField(blank=True, default=None, null=True)),
('video_preset', models.CharField(blank=True, max_length=255, null=True)),
('resolution', models.CharField(blank=True, max_length=255, null=True)),
('framerate', models.FloatField(blank=True, default=None, null=True)),
('audio_codec', models.CharField(max_length=255)),
('audio_bitrate', models.CharField(blank=True, max_length=255, null=True)),
('audio_channels', models.IntegerField(blank=True, default=None, null=True)),
('audio_samplerate', models.IntegerField(blank=True, default=None, null=True)),
('extra_args', models.JSONField(blank=True, default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'ordering': ['-created_at'],
},
),
]

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import environ
BASE_DIR = Path(__file__).resolve().parent.parent
BASE_DIR = Path(__file__).resolve().parent.parent.parent
env = environ.Env(
DEBUG=(bool, False),
@@ -27,7 +27,7 @@ INSTALLED_APPS = [
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"mpr.media_assets",
"admin.mpr.media_assets",
]
MIDDLEWARE = [
@@ -40,7 +40,7 @@ MIDDLEWARE = [
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = "mpr.urls"
ROOT_URLCONF = "admin.mpr.urls"
TEMPLATES = [
{
@@ -57,7 +57,7 @@ TEMPLATES = [
},
]
WSGI_APPLICATION = "mpr.wsgi.application"
WSGI_APPLICATION = "admin.mpr.wsgi.application"
# Database
DATABASE_URL = env("DATABASE_URL", default="sqlite:///db.sqlite3")

View File

@@ -11,6 +11,6 @@ import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mpr.settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'admin.mpr.settings')
application = get_wsgi_application()

View File

@@ -1,326 +0,0 @@
"""
GraphQL API using graphene, mounted on FastAPI/Starlette.
Primary API for MPR — all client interactions go through GraphQL.
Uses Django ORM directly for data access.
Types are generated from schema/ via modelgen — see api/schema/graphql.py.
"""
import os
import graphene
from api.schema.graphql import (
CreateJobInput,
DeleteResultType,
MediaAssetType,
ScanResultType,
SystemStatusType,
TranscodeJobType,
TranscodePresetType,
UpdateAssetInput,
)
from core.storage import BUCKET_IN, list_objects
# Media extensions (same as assets route)
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"}
AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}
MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS
# ---------------------------------------------------------------------------
# Queries
# ---------------------------------------------------------------------------
class Query(graphene.ObjectType):
assets = graphene.List(
MediaAssetType,
status=graphene.String(),
search=graphene.String(),
)
asset = graphene.Field(MediaAssetType, id=graphene.UUID(required=True))
jobs = graphene.List(
TranscodeJobType,
status=graphene.String(),
source_asset_id=graphene.UUID(),
)
job = graphene.Field(TranscodeJobType, id=graphene.UUID(required=True))
presets = graphene.List(TranscodePresetType)
system_status = graphene.Field(SystemStatusType)
def resolve_assets(self, info, status=None, search=None):
from mpr.media_assets.models import MediaAsset
qs = MediaAsset.objects.all()
if status:
qs = qs.filter(status=status)
if search:
qs = qs.filter(filename__icontains=search)
return qs
def resolve_asset(self, info, id):
from mpr.media_assets.models import MediaAsset
try:
return MediaAsset.objects.get(id=id)
except MediaAsset.DoesNotExist:
return None
def resolve_jobs(self, info, status=None, source_asset_id=None):
from mpr.media_assets.models import TranscodeJob
qs = TranscodeJob.objects.all()
if status:
qs = qs.filter(status=status)
if source_asset_id:
qs = qs.filter(source_asset_id=source_asset_id)
return qs
def resolve_job(self, info, id):
from mpr.media_assets.models import TranscodeJob
try:
return TranscodeJob.objects.get(id=id)
except TranscodeJob.DoesNotExist:
return None
def resolve_presets(self, info):
from mpr.media_assets.models import TranscodePreset
return TranscodePreset.objects.all()
def resolve_system_status(self, info):
return {"status": "ok", "version": "0.1.0"}
# ---------------------------------------------------------------------------
# Mutations
# ---------------------------------------------------------------------------
class ScanMediaFolder(graphene.Mutation):
class Arguments:
pass
Output = ScanResultType
def mutate(self, info):
from mpr.media_assets.models import MediaAsset
objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS)
existing = set(MediaAsset.objects.values_list("filename", flat=True))
registered = []
skipped = []
for obj in objects:
if obj["filename"] in existing:
skipped.append(obj["filename"])
continue
try:
MediaAsset.objects.create(
filename=obj["filename"],
file_path=obj["key"],
file_size=obj["size"],
)
registered.append(obj["filename"])
except Exception:
pass
return ScanResultType(
found=len(objects),
registered=len(registered),
skipped=len(skipped),
files=registered,
)
class CreateJob(graphene.Mutation):
class Arguments:
input = CreateJobInput(required=True)
Output = TranscodeJobType
def mutate(self, info, input):
from pathlib import Path
from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset
try:
source = MediaAsset.objects.get(id=input.source_asset_id)
except MediaAsset.DoesNotExist:
raise Exception("Source asset not found")
preset = None
preset_snapshot = {}
if input.preset_id:
try:
preset = TranscodePreset.objects.get(id=input.preset_id)
preset_snapshot = {
"name": preset.name,
"container": preset.container,
"video_codec": preset.video_codec,
"audio_codec": preset.audio_codec,
}
except TranscodePreset.DoesNotExist:
raise Exception("Preset not found")
if not preset and not input.trim_start and not input.trim_end:
raise Exception("Must specify preset_id or trim_start/trim_end")
output_filename = input.output_filename
if not output_filename:
stem = Path(source.filename).stem
ext = preset_snapshot.get("container", "mp4") if preset else "mp4"
output_filename = f"{stem}_output.{ext}"
job = TranscodeJob.objects.create(
source_asset_id=source.id,
preset_id=preset.id if preset else None,
preset_snapshot=preset_snapshot,
trim_start=input.trim_start,
trim_end=input.trim_end,
output_filename=output_filename,
output_path=output_filename,
priority=input.priority or 0,
)
# Dispatch
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
if executor_mode == "lambda":
from task.executor import get_executor
get_executor().run(
job_id=str(job.id),
source_path=source.file_path,
output_path=output_filename,
preset=preset_snapshot or None,
trim_start=input.trim_start,
trim_end=input.trim_end,
duration=source.duration,
)
else:
from task.tasks import run_transcode_job
result = run_transcode_job.delay(
job_id=str(job.id),
source_key=source.file_path,
output_key=output_filename,
preset=preset_snapshot or None,
trim_start=input.trim_start,
trim_end=input.trim_end,
duration=source.duration,
)
job.celery_task_id = result.id
job.save(update_fields=["celery_task_id"])
return job
class CancelJob(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
Output = TranscodeJobType
def mutate(self, info, id):
from mpr.media_assets.models import TranscodeJob
try:
job = TranscodeJob.objects.get(id=id)
except TranscodeJob.DoesNotExist:
raise Exception("Job not found")
if job.status not in ("pending", "processing"):
raise Exception(f"Cannot cancel job with status: {job.status}")
job.status = "cancelled"
job.save(update_fields=["status"])
return job
class RetryJob(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
Output = TranscodeJobType
def mutate(self, info, id):
from mpr.media_assets.models import TranscodeJob
try:
job = TranscodeJob.objects.get(id=id)
except TranscodeJob.DoesNotExist:
raise Exception("Job not found")
if job.status != "failed":
raise Exception("Only failed jobs can be retried")
job.status = "pending"
job.progress = 0
job.error_message = None
job.save(update_fields=["status", "progress", "error_message"])
return job
class UpdateAsset(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
input = UpdateAssetInput(required=True)
Output = MediaAssetType
def mutate(self, info, id, input):
from mpr.media_assets.models import MediaAsset
try:
asset = MediaAsset.objects.get(id=id)
except MediaAsset.DoesNotExist:
raise Exception("Asset not found")
update_fields = []
if input.comments is not None:
asset.comments = input.comments
update_fields.append("comments")
if input.tags is not None:
asset.tags = input.tags
update_fields.append("tags")
if update_fields:
asset.save(update_fields=update_fields)
return asset
class DeleteAsset(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
Output = DeleteResultType
def mutate(self, info, id):
from mpr.media_assets.models import MediaAsset
try:
asset = MediaAsset.objects.get(id=id)
asset.delete()
return DeleteResultType(ok=True)
except MediaAsset.DoesNotExist:
raise Exception("Asset not found")
class Mutation(graphene.ObjectType):
scan_media_folder = ScanMediaFolder.Field()
create_job = CreateJob.Field()
cancel_job = CancelJob.Field()
retry_job = RetryJob.Field()
update_asset = UpdateAsset.Field()
delete_asset = DeleteAsset.Field()
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
schema = graphene.Schema(query=Query, mutation=Mutation)

View File

@@ -1,142 +0,0 @@
"""
Graphene Types - GENERATED FILE
Do not edit directly. Regenerate using modelgen.
"""
import graphene
class AssetStatus(graphene.Enum):
PENDING = "pending"
READY = "ready"
ERROR = "error"
class JobStatus(graphene.Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class MediaAssetType(graphene.ObjectType):
"""A video/audio file registered in the system."""
id = graphene.UUID()
filename = graphene.String()
file_path = graphene.String()
status = graphene.String()
error_message = graphene.String()
file_size = graphene.Int()
duration = graphene.Float()
video_codec = graphene.String()
audio_codec = graphene.String()
width = graphene.Int()
height = graphene.Int()
framerate = graphene.Float()
bitrate = graphene.Int()
properties = graphene.JSONString()
comments = graphene.String()
tags = graphene.List(graphene.String)
created_at = graphene.DateTime()
updated_at = graphene.DateTime()
class TranscodePresetType(graphene.ObjectType):
"""A reusable transcoding configuration (like Handbrake presets)."""
id = graphene.UUID()
name = graphene.String()
description = graphene.String()
is_builtin = graphene.Boolean()
container = graphene.String()
video_codec = graphene.String()
video_bitrate = graphene.String()
video_crf = graphene.Int()
video_preset = graphene.String()
resolution = graphene.String()
framerate = graphene.Float()
audio_codec = graphene.String()
audio_bitrate = graphene.String()
audio_channels = graphene.Int()
audio_samplerate = graphene.Int()
extra_args = graphene.List(graphene.String)
created_at = graphene.DateTime()
updated_at = graphene.DateTime()
class TranscodeJobType(graphene.ObjectType):
"""A transcoding or trimming job in the queue."""
id = graphene.UUID()
source_asset_id = graphene.UUID()
preset_id = graphene.UUID()
preset_snapshot = graphene.JSONString()
trim_start = graphene.Float()
trim_end = graphene.Float()
output_filename = graphene.String()
output_path = graphene.String()
output_asset_id = graphene.UUID()
status = graphene.String()
progress = graphene.Float()
current_frame = graphene.Int()
current_time = graphene.Float()
speed = graphene.String()
error_message = graphene.String()
celery_task_id = graphene.String()
execution_arn = graphene.String()
priority = graphene.Int()
created_at = graphene.DateTime()
started_at = graphene.DateTime()
completed_at = graphene.DateTime()
class CreateJobInput(graphene.InputObjectType):
"""Request body for creating a transcode/trim job."""
source_asset_id = graphene.UUID(required=True)
preset_id = graphene.UUID()
trim_start = graphene.Float()
trim_end = graphene.Float()
output_filename = graphene.String()
priority = graphene.Int(default_value=0)
class UpdateAssetInput(graphene.InputObjectType):
"""Request body for updating asset metadata."""
comments = graphene.String()
tags = graphene.List(graphene.String)
class SystemStatusType(graphene.ObjectType):
"""System status response."""
status = graphene.String()
version = graphene.String()
class ScanResultType(graphene.ObjectType):
"""Result of scanning the media input bucket."""
found = graphene.Int()
registered = graphene.Int()
skipped = graphene.Int()
files = graphene.List(graphene.String)
class DeleteResultType(graphene.ObjectType):
"""Result of a delete operation."""
ok = graphene.Boolean()
class WorkerStatusType(graphene.ObjectType):
"""Worker health and capabilities."""
available = graphene.Boolean()
active_jobs = graphene.Int()
supported_codecs = graphene.List(graphene.String)
gpu_available = graphene.Boolean()

273
core/api/graphql.py Normal file
View File

@@ -0,0 +1,273 @@
"""
GraphQL API using strawberry, served via FastAPI.
Primary API for MPR — all client interactions go through GraphQL.
Uses core.db for data access.
Types are generated from schema/ via modelgen — see api/schema/graphql.py.
"""
import os
from typing import List, Optional
from uuid import UUID
import strawberry
from strawberry.schema.config import StrawberryConfig
from strawberry.types import Info
from core.api.schema.graphql import (
CreateJobInput,
DeleteResultType,
MediaAssetType,
ScanResultType,
SystemStatusType,
TranscodeJobType,
TranscodePresetType,
UpdateAssetInput,
)
from core.storage import BUCKET_IN, list_objects
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"}
AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}
MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS
# ---------------------------------------------------------------------------
# Queries
# ---------------------------------------------------------------------------
@strawberry.type
class Query:
@strawberry.field
def assets(
self,
info: Info,
status: Optional[str] = None,
search: Optional[str] = None,
) -> List[MediaAssetType]:
from core.db import list_assets
return list_assets(status=status, search=search)
@strawberry.field
def asset(self, info: Info, id: UUID) -> Optional[MediaAssetType]:
from core.db import get_asset
try:
return get_asset(id)
except Exception:
return None
@strawberry.field
def jobs(
self,
info: Info,
status: Optional[str] = None,
source_asset_id: Optional[UUID] = None,
) -> List[TranscodeJobType]:
from core.db import list_jobs
return list_jobs(status=status, source_asset_id=source_asset_id)
@strawberry.field
def job(self, info: Info, id: UUID) -> Optional[TranscodeJobType]:
from core.db import get_job
try:
return get_job(id)
except Exception:
return None
@strawberry.field
def presets(self, info: Info) -> List[TranscodePresetType]:
from core.db import list_presets
return list_presets()
@strawberry.field
def system_status(self, info: Info) -> SystemStatusType:
return SystemStatusType(status="ok", version="0.1.0")
# ---------------------------------------------------------------------------
# Mutations
# ---------------------------------------------------------------------------
@strawberry.type
class Mutation:
@strawberry.mutation
def scan_media_folder(self, info: Info) -> ScanResultType:
from core.db import create_asset, get_asset_filenames
objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS)
existing = get_asset_filenames()
registered = []
skipped = []
for obj in objects:
if obj["filename"] in existing:
skipped.append(obj["filename"])
continue
try:
create_asset(
filename=obj["filename"],
file_path=obj["key"],
file_size=obj["size"],
)
registered.append(obj["filename"])
except Exception:
pass
return ScanResultType(
found=len(objects),
registered=len(registered),
skipped=len(skipped),
files=registered,
)
@strawberry.mutation
def create_job(self, info: Info, input: CreateJobInput) -> TranscodeJobType:
from pathlib import Path
from core.db import create_job, get_asset, get_preset
try:
source = get_asset(input.source_asset_id)
except Exception:
raise Exception("Source asset not found")
preset = None
preset_snapshot = {}
if input.preset_id:
try:
preset = get_preset(input.preset_id)
preset_snapshot = {
"name": preset.name,
"container": preset.container,
"video_codec": preset.video_codec,
"audio_codec": preset.audio_codec,
}
except Exception:
raise Exception("Preset not found")
if not preset and not input.trim_start and not input.trim_end:
raise Exception("Must specify preset_id or trim_start/trim_end")
output_filename = input.output_filename
if not output_filename:
stem = Path(source.filename).stem
ext = preset_snapshot.get("container", "mp4") if preset else "mp4"
output_filename = f"{stem}_output.{ext}"
job = create_job(
source_asset_id=source.id,
preset_id=preset.id if preset else None,
preset_snapshot=preset_snapshot,
trim_start=input.trim_start,
trim_end=input.trim_end,
output_filename=output_filename,
output_path=output_filename,
priority=input.priority or 0,
)
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
if executor_mode in ("lambda", "gcp"):
from core.task.executor import get_executor
get_executor().run(
job_id=str(job.id),
source_path=source.file_path,
output_path=output_filename,
preset=preset_snapshot or None,
trim_start=input.trim_start,
trim_end=input.trim_end,
duration=source.duration,
)
else:
from core.task.tasks import run_transcode_job
result = run_transcode_job.delay(
job_id=str(job.id),
source_key=source.file_path,
output_key=output_filename,
preset=preset_snapshot or None,
trim_start=input.trim_start,
trim_end=input.trim_end,
duration=source.duration,
)
job.celery_task_id = result.id
job.save(update_fields=["celery_task_id"])
return job
@strawberry.mutation
def cancel_job(self, info: Info, id: UUID) -> TranscodeJobType:
from core.db import get_job, update_job
try:
job = get_job(id)
except Exception:
raise Exception("Job not found")
if job.status not in ("pending", "processing"):
raise Exception(f"Cannot cancel job with status: {job.status}")
return update_job(job, status="cancelled")
@strawberry.mutation
def retry_job(self, info: Info, id: UUID) -> TranscodeJobType:
from core.db import get_job, update_job
try:
job = get_job(id)
except Exception:
raise Exception("Job not found")
if job.status != "failed":
raise Exception("Only failed jobs can be retried")
return update_job(job, status="pending", progress=0, error_message=None)
@strawberry.mutation
def update_asset(self, info: Info, id: UUID, input: UpdateAssetInput) -> MediaAssetType:
from core.db import get_asset, update_asset
try:
asset = get_asset(id)
except Exception:
raise Exception("Asset not found")
fields = {}
if input.comments is not None:
fields["comments"] = input.comments
if input.tags is not None:
fields["tags"] = input.tags
if fields:
asset = update_asset(asset, **fields)
return asset
@strawberry.mutation
def delete_asset(self, info: Info, id: UUID) -> DeleteResultType:
from core.db import delete_asset, get_asset
try:
asset = get_asset(id)
delete_asset(asset)
return DeleteResultType(ok=True)
except Exception:
raise Exception("Asset not found")
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
schema = strawberry.Schema(
query=Query,
mutation=Mutation,
config=StrawberryConfig(auto_camel_case=False),
)

View File

@@ -10,10 +10,10 @@ from typing import Optional
from uuid import UUID
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Initialize Django before importing models
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mpr.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "admin.mpr.settings")
import django
@@ -21,9 +21,9 @@ django.setup()
from fastapi import FastAPI, Header, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from starlette_graphene3 import GraphQLApp, make_graphiql_handler
from strawberry.fastapi import GraphQLRouter
from api.graphql import schema as graphql_schema
from core.api.graphql import schema as graphql_schema
CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "")
@@ -45,7 +45,8 @@ app.add_middleware(
)
# GraphQL
app.mount("/graphql", GraphQLApp(schema=graphql_schema, on_get=make_graphiql_handler()))
graphql_router = GraphQLRouter(schema=graphql_schema, graphql_ide="graphiql")
app.include_router(graphql_router, prefix="/graphql")
@app.get("/")
@@ -73,26 +74,25 @@ def job_callback(
from django.utils import timezone
from mpr.media_assets.models import TranscodeJob
from core.db import get_job, update_job
try:
job = TranscodeJob.objects.get(id=job_id)
except TranscodeJob.DoesNotExist:
job = get_job(job_id)
except Exception:
raise HTTPException(status_code=404, detail="Job not found")
status = payload.get("status", "failed")
job.status = status
job.progress = 100.0 if status == "completed" else job.progress
update_fields = ["status", "progress"]
fields = {
"status": status,
"progress": 100.0 if status == "completed" else job.progress,
}
if payload.get("error"):
job.error_message = payload["error"]
update_fields.append("error_message")
fields["error_message"] = payload["error"]
if status in ("completed", "failed"):
job.completed_at = timezone.now()
update_fields.append("completed_at")
fields["completed_at"] = timezone.now()
job.save(update_fields=update_fields)
update_job(job, **fields)
return {"ok": True}

158
core/api/schema/graphql.py Normal file
View File

@@ -0,0 +1,158 @@
"""
Strawberry Types - GENERATED FILE
Do not edit directly. Regenerate using modelgen.
"""
import strawberry
from enum import Enum
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from strawberry.scalars import JSON
@strawberry.enum
class AssetStatus(Enum):
PENDING = "pending"
READY = "ready"
ERROR = "error"
@strawberry.enum
class JobStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@strawberry.type
class MediaAssetType:
"""A video/audio file registered in the system."""
id: Optional[UUID] = None
filename: Optional[str] = None
file_path: Optional[str] = None
status: Optional[str] = None
error_message: Optional[str] = None
file_size: Optional[int] = None
duration: Optional[float] = None
video_codec: Optional[str] = None
audio_codec: Optional[str] = None
width: Optional[int] = None
height: Optional[int] = None
framerate: Optional[float] = None
bitrate: Optional[int] = None
properties: Optional[JSON] = None
comments: Optional[str] = None
tags: Optional[List[str]] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@strawberry.type
class TranscodePresetType:
"""A reusable transcoding configuration (like Handbrake presets)."""
id: Optional[UUID] = None
name: Optional[str] = None
description: Optional[str] = None
is_builtin: Optional[bool] = None
container: Optional[str] = None
video_codec: Optional[str] = None
video_bitrate: Optional[str] = None
video_crf: Optional[int] = None
video_preset: Optional[str] = None
resolution: Optional[str] = None
framerate: Optional[float] = None
audio_codec: Optional[str] = None
audio_bitrate: Optional[str] = None
audio_channels: Optional[int] = None
audio_samplerate: Optional[int] = None
extra_args: Optional[List[str]] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@strawberry.type
class TranscodeJobType:
"""A transcoding or trimming job in the queue."""
id: Optional[UUID] = None
source_asset_id: Optional[UUID] = None
preset_id: Optional[UUID] = None
preset_snapshot: Optional[JSON] = None
trim_start: Optional[float] = None
trim_end: Optional[float] = None
output_filename: Optional[str] = None
output_path: Optional[str] = None
output_asset_id: Optional[UUID] = None
status: Optional[str] = None
progress: Optional[float] = None
current_frame: Optional[int] = None
current_time: Optional[float] = None
speed: Optional[str] = None
error_message: Optional[str] = None
celery_task_id: Optional[str] = None
execution_arn: Optional[str] = None
priority: Optional[int] = None
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@strawberry.input
class CreateJobInput:
"""Request body for creating a transcode/trim job."""
source_asset_id: UUID
preset_id: Optional[UUID] = None
trim_start: Optional[float] = None
trim_end: Optional[float] = None
output_filename: Optional[str] = None
priority: int = 0
@strawberry.input
class UpdateAssetInput:
"""Request body for updating asset metadata."""
comments: Optional[str] = None
tags: Optional[List[str]] = None
@strawberry.type
class SystemStatusType:
"""System status response."""
status: Optional[str] = None
version: Optional[str] = None
@strawberry.type
class ScanResultType:
"""Result of scanning the media input bucket."""
found: Optional[int] = None
registered: Optional[int] = None
skipped: Optional[int] = None
files: Optional[List[str]] = None
@strawberry.type
class DeleteResultType:
"""Result of a delete operation."""
ok: Optional[bool] = None
@strawberry.type
class WorkerStatusType:
"""Worker health and capabilities."""
available: Optional[bool] = None
active_jobs: Optional[int] = None
supported_codecs: Optional[List[str]] = None
gpu_available: Optional[bool] = None

19
core/db/__init__.py Normal file
View File

@@ -0,0 +1,19 @@
from .assets import (
create_asset,
delete_asset,
get_asset,
get_asset_filenames,
list_assets,
update_asset,
)
from .jobs import (
create_job,
get_job,
list_jobs,
update_job,
update_job_fields,
)
from .presets import (
get_preset,
list_presets,
)

48
core/db/assets.py Normal file
View File

@@ -0,0 +1,48 @@
"""Database operations for MediaAsset."""
from typing import Optional
from uuid import UUID
def list_assets(status: Optional[str] = None, search: Optional[str] = None):
from admin.mpr.media_assets.models import MediaAsset
qs = MediaAsset.objects.all()
if status:
qs = qs.filter(status=status)
if search:
qs = qs.filter(filename__icontains=search)
return list(qs)
def get_asset(id: UUID):
from admin.mpr.media_assets.models import MediaAsset
return MediaAsset.objects.get(id=id)
def get_asset_filenames() -> set[str]:
from admin.mpr.media_assets.models import MediaAsset
return set(MediaAsset.objects.values_list("filename", flat=True))
def create_asset(*, filename: str, file_path: str, file_size: int):
from admin.mpr.media_assets.models import MediaAsset
return MediaAsset.objects.create(
filename=filename,
file_path=file_path,
file_size=file_size,
)
def update_asset(asset, **fields):
for key, value in fields.items():
setattr(asset, key, value)
asset.save(update_fields=list(fields.keys()))
return asset
def delete_asset(asset):
asset.delete()

40
core/db/jobs.py Normal file
View File

@@ -0,0 +1,40 @@
"""Database operations for TranscodeJob."""
from typing import Optional
from uuid import UUID
def list_jobs(status: Optional[str] = None, source_asset_id: Optional[UUID] = None):
from admin.mpr.media_assets.models import TranscodeJob
qs = TranscodeJob.objects.all()
if status:
qs = qs.filter(status=status)
if source_asset_id:
qs = qs.filter(source_asset_id=source_asset_id)
return list(qs)
def get_job(id: UUID):
from admin.mpr.media_assets.models import TranscodeJob
return TranscodeJob.objects.get(id=id)
def create_job(**fields):
from admin.mpr.media_assets.models import TranscodeJob
return TranscodeJob.objects.create(**fields)
def update_job(job, **fields):
for key, value in fields.items():
setattr(job, key, value)
job.save(update_fields=list(fields.keys()))
return job
def update_job_fields(job_id, **fields):
from admin.mpr.media_assets.models import TranscodeJob
TranscodeJob.objects.filter(id=job_id).update(**fields)

15
core/db/presets.py Normal file
View File

@@ -0,0 +1,15 @@
"""Database operations for TranscodePreset."""
from uuid import UUID
def list_presets():
from admin.mpr.media_assets.models import TranscodePreset
return list(TranscodePreset.objects.all())
def get_preset(id: UUID):
from admin.mpr.media_assets.models import TranscodePreset
return TranscodePreset.objects.get(id=id)

View File

@@ -59,7 +59,7 @@ class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer):
# Dispatch to Celery if available
if self.celery_app:
from task.tasks import run_transcode_job
from core.task.tasks import run_transcode_job
task = run_transcode_job.delay(
job_id=job_id,
@@ -219,9 +219,8 @@ def update_job_progress(
try:
from django.utils import timezone
from mpr.media_assets.models import TranscodeJob
from core.db import update_job_fields
update_fields = ["progress", "current_frame", "current_time", "speed", "status"]
updates = {
"progress": progress,
"current_frame": current_frame,
@@ -232,16 +231,13 @@ def update_job_progress(
if error:
updates["error_message"] = error
update_fields.append("error_message")
if status == "processing":
updates["started_at"] = timezone.now()
update_fields.append("started_at")
elif status in ("completed", "failed"):
updates["completed_at"] = timezone.now()
update_fields.append("completed_at")
TranscodeJob.objects.filter(id=job_id).update(**updates)
update_job_fields(job_id, **updates)
except Exception as e:
logger.warning(f"Failed to update job {job_id} in DB: {e}")

View File

@@ -1,14 +1,14 @@
{
"schema": "schema/models",
"schema": "core/schema/models",
"targets": [
{
"target": "django",
"output": "mpr/media_assets/models.py",
"output": "admin/mpr/media_assets/models.py",
"include": ["dataclasses", "enums"]
},
{
"target": "graphene",
"output": "api/schema/graphql.py",
"output": "core/api/schema/graphql.py",
"include": ["dataclasses", "enums", "api"]
},
{
@@ -18,7 +18,7 @@
},
{
"target": "protobuf",
"output": "rpc/protos/worker.proto",
"output": "core/rpc/protos/worker.proto",
"include": ["grpc"]
}
]

10
core/storage/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
from .s3 import (
BUCKET_IN,
BUCKET_OUT,
download_file,
download_to_temp,
get_presigned_url,
get_s3_client,
list_objects,
upload_file,
)

1
core/storage/gcp.py Normal file
View File

@@ -0,0 +1 @@
"""GCP Cloud Storage backend (placeholder)."""

1
core/storage/local.py Normal file
View File

@@ -0,0 +1 @@
"""Local filesystem storage backend (placeholder)."""

View File

@@ -156,8 +156,81 @@ class LambdaExecutor(Executor):
# Store execution ARN on the job
execution_arn = response["executionArn"]
try:
from mpr.media_assets.models import TranscodeJob
TranscodeJob.objects.filter(id=job_id).update(execution_arn=execution_arn)
from core.db import update_job_fields
update_job_fields(job_id, execution_arn=execution_arn)
except Exception:
pass
return True
class GCPExecutor(Executor):
"""Execute jobs via Google Cloud Run Jobs."""
def __init__(self):
from google.cloud import run_v2
self.client = run_v2.JobsClient()
self.project_id = os.environ["GCP_PROJECT_ID"]
self.region = os.environ.get("GCP_REGION", "us-central1")
self.job_name = os.environ["CLOUD_RUN_JOB"]
self.callback_url = os.environ.get("CALLBACK_URL", "")
self.callback_api_key = os.environ.get("CALLBACK_API_KEY", "")
def run(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
) -> bool:
"""Trigger a Cloud Run Job execution for this job."""
import json
from google.cloud import run_v2
payload = {
"job_id": job_id,
"source_key": source_path,
"output_key": output_path,
"preset": preset,
"trim_start": trim_start,
"trim_end": trim_end,
"duration": duration,
"callback_url": self.callback_url,
"api_key": self.callback_api_key,
}
job_path = (
f"projects/{self.project_id}/locations/{self.region}/jobs/{self.job_name}"
)
request = run_v2.RunJobRequest(
name=job_path,
overrides=run_v2.RunJobRequest.Overrides(
container_overrides=[
run_v2.RunJobRequest.Overrides.ContainerOverride(
env=[
run_v2.EnvVar(
name="MPR_JOB_PAYLOAD", value=json.dumps(payload)
)
]
)
]
),
)
operation = self.client.run_job(request=request)
execution_name = operation.metadata.name
try:
from core.db import update_job_fields
update_job_fields(job_id, execution_arn=execution_name)
except Exception:
pass
@@ -168,6 +241,7 @@ class LambdaExecutor(Executor):
_executors: Dict[str, type] = {
"local": LocalExecutor,
"lambda": LambdaExecutor,
"gcp": GCPExecutor,
}
_executor_instance: Optional[Executor] = None

121
core/task/gcp_handler.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Google Cloud Run Job handler for media transcoding.
Reads job payload from the MPR_JOB_PAYLOAD env var (injected by GCPExecutor),
downloads source from S3-compatible storage (GCS via HMAC + S3 API),
runs FFmpeg, uploads result, and calls back to the API.
Uses core/storage and core/ffmpeg — same modules as the Celery worker.
No cloud-provider SDK required here; storage goes through core.storage (boto3 + S3 compat).
Entry point: python -m task.gcp_handler (set as Cloud Run Job command)
"""
import json
import logging
import os
import sys
import tempfile
from pathlib import Path
import requests
from core.ffmpeg.transcode import TranscodeConfig, transcode
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def main() -> None:
raw = os.environ.get("MPR_JOB_PAYLOAD")
if not raw:
logger.error("MPR_JOB_PAYLOAD not set")
sys.exit(1)
event = json.loads(raw)
job_id = event["job_id"]
source_key = event["source_key"]
output_key = event["output_key"]
preset = event.get("preset")
trim_start = event.get("trim_start")
trim_end = event.get("trim_end")
duration = event.get("duration")
callback_url = event.get("callback_url", "")
api_key = event.get("api_key", "")
logger.info(f"Starting job {job_id}: {source_key} -> {output_key}")
tmp_source = download_to_temp(BUCKET_IN, source_key)
ext_out = Path(output_key).suffix or ".mp4"
fd, tmp_output = tempfile.mkstemp(suffix=ext_out)
os.close(fd)
try:
if preset:
config = TranscodeConfig(
input_path=tmp_source,
output_path=tmp_output,
video_codec=preset.get("video_codec", "libx264"),
video_bitrate=preset.get("video_bitrate"),
video_crf=preset.get("video_crf"),
video_preset=preset.get("video_preset"),
resolution=preset.get("resolution"),
framerate=preset.get("framerate"),
audio_codec=preset.get("audio_codec", "aac"),
audio_bitrate=preset.get("audio_bitrate"),
audio_channels=preset.get("audio_channels"),
audio_samplerate=preset.get("audio_samplerate"),
container=preset.get("container", "mp4"),
extra_args=preset.get("extra_args", []),
trim_start=trim_start,
trim_end=trim_end,
)
else:
config = TranscodeConfig(
input_path=tmp_source,
output_path=tmp_output,
video_codec="copy",
audio_codec="copy",
trim_start=trim_start,
trim_end=trim_end,
)
success = transcode(config, duration=duration)
if not success:
raise RuntimeError("Transcode returned False")
logger.info(f"Uploading to {BUCKET_OUT}/{output_key}")
upload_file(tmp_output, BUCKET_OUT, output_key)
_callback(callback_url, job_id, api_key, {"status": "completed"})
logger.info(f"Job {job_id} completed")
sys.exit(0)
except Exception as e:
logger.exception(f"Job {job_id} failed: {e}")
_callback(callback_url, job_id, api_key, {"status": "failed", "error": str(e)})
sys.exit(1)
finally:
for f in [tmp_source, tmp_output]:
try:
os.unlink(f)
except OSError:
pass
def _callback(callback_url: str, job_id: str, api_key: str, payload: dict) -> None:
if not callback_url:
return
try:
url = f"{callback_url}/jobs/{job_id}/callback"
headers = {"X-API-Key": api_key} if api_key else {}
resp = requests.post(url, json=payload, headers=headers, timeout=10)
logger.info(f"Callback response: {resp.status_code}")
except Exception as e:
logger.warning(f"Callback failed: {e}")
if __name__ == "__main__":
main()

View File

@@ -9,8 +9,8 @@ from typing import Any, Dict, Optional
from celery import shared_task
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
from rpc.server import update_job_progress
from task.executor import get_executor
from core.rpc.server import update_job_progress
from core.task.executor import get_executor
logger = logging.getLogger(__name__)

View File

@@ -16,7 +16,7 @@ REDIS_URL=redis://redis:6379/0
# Django
DEBUG=1
DJANGO_SETTINGS_MODULE=mpr.settings
DJANGO_SETTINGS_MODULE=admin.mpr.settings
SECRET_KEY=change-this-in-production
# Worker

View File

@@ -1,9 +1,5 @@
FROM python:3.11-slim
RUN apt-get update && apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
@@ -11,4 +7,4 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "manage.py", "runserver", "0.0.0.0:8000"]
CMD ["python", "admin/manage.py", "runserver", "0.0.0.0:8000"]

14
ctrl/Dockerfile.worker Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.11-slim
RUN apt-get update && apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt requirements-worker.txt ./
RUN pip install --no-cache-dir -r requirements-worker.txt
COPY . .
CMD ["celery", "-A", "admin.mpr", "worker", "--loglevel=info"]

View File

@@ -1,7 +1,7 @@
x-common-env: &common-env
DATABASE_URL: postgresql://mpr_user:mpr_pass@postgres:5432/mpr
REDIS_URL: redis://redis:6379/0
DJANGO_SETTINGS_MODULE: mpr.settings
DJANGO_SETTINGS_MODULE: admin.mpr.settings
DEBUG: 1
GRPC_HOST: grpc
GRPC_PORT: 50051
@@ -96,9 +96,9 @@ services:
context: ..
dockerfile: ctrl/Dockerfile
command: >
bash -c "python manage.py migrate &&
python manage.py loadbuiltins || true &&
python manage.py runserver 0.0.0.0:8701"
bash -c "python admin/manage.py migrate &&
python admin/manage.py loadbuiltins || true &&
python admin/manage.py runserver 0.0.0.0:8701"
ports:
- "8701:8701"
environment:
@@ -115,11 +115,12 @@ services:
build:
context: ..
dockerfile: ctrl/Dockerfile
command: uvicorn api.main:app --host 0.0.0.0 --port 8702 --reload
command: uvicorn core.api.main:app --host 0.0.0.0 --port 8702 --reload
ports:
- "8702:8702"
environment:
<<: *common-env
DJANGO_ALLOW_ASYNC_UNSAFE: "true"
volumes:
- ..:/app
depends_on:
@@ -132,7 +133,7 @@ services:
build:
context: ..
dockerfile: ctrl/Dockerfile
command: python -m rpc.server
command: python -m core.rpc.server
ports:
- "50052:50051"
environment:
@@ -150,8 +151,8 @@ services:
celery:
build:
context: ..
dockerfile: ctrl/Dockerfile
command: celery -A mpr worker -l info -Q transcode -c 2
dockerfile: ctrl/Dockerfile.worker
command: celery -A admin.mpr worker -l info -Q transcode -c 2
environment:
<<: *common-env
MPR_EXECUTOR: local

View File

@@ -1,22 +1,22 @@
#!/bin/bash
# Model generation script for MPR
# Generates all targets from schema/modelgen.json config
# Generates all targets from core/schema/modelgen.json config
set -e
cd "$(dirname "$0")/.."
echo "Generating models from schema/models..."
python -m modelgen generate --config schema/modelgen.json
echo "Generating models from core/schema/models..."
python -m modelgen generate --config core/schema/modelgen.json
# Generate gRPC stubs from proto
echo "Generating gRPC stubs..."
python -m grpc_tools.protoc \
-I rpc/protos \
--python_out=rpc \
--grpc_python_out=rpc \
rpc/protos/worker.proto
-I core/rpc/protos \
--python_out=core/rpc \
--grpc_python_out=core/rpc \
core/rpc/protos/worker.proto
# Fix relative import in generated grpc stub
sed -i 's/^import worker_pb2/from . import worker_pb2/' rpc/worker_pb2_grpc.py
sed -i 's/^import worker_pb2/from . import worker_pb2/' core/rpc/worker_pb2_grpc.py
echo "Done!"

View File

@@ -14,8 +14,8 @@ COPY ctrl/lambda/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY task/lambda_handler.py ${LAMBDA_TASK_ROOT}/task/lambda_handler.py
COPY task/__init__.py ${LAMBDA_TASK_ROOT}/task/__init__.py
COPY core/task/lambda_handler.py ${LAMBDA_TASK_ROOT}/core/task/lambda_handler.py
COPY core/task/__init__.py ${LAMBDA_TASK_ROOT}/core/task/__init__.py
COPY core/ ${LAMBDA_TASK_ROOT}/core/
CMD ["task.lambda_handler.handler"]
CMD ["core.task.lambda_handler.handler"]

View File

@@ -44,9 +44,9 @@ http {
proxy_set_header Host $host;
}
# FastAPI
# FastAPI — trailing slash strips /api prefix before forwarding
location /api/ {
proxy_pass http://fastapi;
proxy_pass http://fastapi/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

View File

@@ -0,0 +1,83 @@
digraph gcp_architecture {
rankdir=TB
node [shape=box, style=rounded, fontname="Helvetica"]
edge [fontname="Helvetica", fontsize=10]
labelloc="t"
label="MPR - GCP Architecture (Cloud Run Jobs + GCS)"
fontsize=16
fontname="Helvetica-Bold"
graph [splines=ortho, nodesep=0.8, ranksep=0.8]
// External
subgraph cluster_external {
label="External"
style=dashed
color=gray
browser [label="Browser\nmpr.mcrn.ar", shape=ellipse]
}
// Nginx reverse proxy
subgraph cluster_proxy {
label="Reverse Proxy"
style=filled
fillcolor="#e8f4f8"
nginx [label="nginx\nport 80"]
}
// Application layer
subgraph cluster_apps {
label="Application Layer"
style=filled
fillcolor="#f0f8e8"
django [label="Django Admin\n/admin\nport 8701"]
fastapi [label="GraphQL API\n/graphql\nport 8702"]
timeline [label="Timeline UI\n/\nport 5173"]
}
// Data layer (still local)
subgraph cluster_data {
label="Data Layer"
style=filled
fillcolor="#f8e8f0"
postgres [label="PostgreSQL\nport 5436", shape=cylinder]
}
// GCP layer
subgraph cluster_gcp {
label="Google Cloud"
style=filled
fillcolor="#e8f0fd"
cloud_run_job [label="Cloud Run Job\nFFmpeg container\ntranscoding"]
gcs [label="GCS Buckets\n(S3-compat API)", shape=folder]
bucket_in [label="mpr-media-in/\ninput videos", shape=note]
bucket_out [label="mpr-media-out/\ntranscoded output", shape=note]
}
// Connections
browser -> nginx [label="HTTP"]
nginx -> django [xlabel="/admin"]
nginx -> fastapi [xlabel="/graphql"]
nginx -> timeline [xlabel="/"]
timeline -> fastapi [label="GraphQL"]
django -> postgres
fastapi -> postgres [label="read/write jobs"]
fastapi -> cloud_run_job [label="google-cloud-run\nrun_job() + payload\nexecution_name"]
cloud_run_job -> gcs [label="S3 compat (HMAC)\ndownload input\nupload output"]
cloud_run_job -> fastapi [label="POST /jobs/{id}/callback\nupdate status"]
fastapi -> postgres [label="callback updates\njob status"]
gcs -> bucket_in [style=dotted, arrowhead=none]
gcs -> bucket_out [style=dotted, arrowhead=none]
}

View File

@@ -0,0 +1,210 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<!-- Generated by graphviz version 14.1.2 (0)
-->
<!-- Title: gcp_architecture Pages: 1 -->
<svg width="653pt" height="957pt"
viewBox="0.00 0.00 653.00 957.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 953.35)">
<title>gcp_architecture</title>
<polygon fill="white" stroke="none" points="-4,4 -4,-953.35 649.25,-953.35 649.25,4 -4,4"/>
<text xml:space="preserve" text-anchor="middle" x="322.62" y="-930.15" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">MPR &#45; GCP Architecture (Cloud Run Jobs + GCS)</text>
<g id="clust1" class="cluster">
<title>cluster_external</title>
<polygon fill="none" stroke="gray" stroke-dasharray="5,2" points="155,-810.25 155,-913.85 315,-913.85 315,-810.25 155,-810.25"/>
<text xml:space="preserve" text-anchor="middle" x="235" y="-894.65" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">External</text>
</g>
<g id="clust2" class="cluster">
<title>cluster_proxy</title>
<polygon fill="#e8f4f8" stroke="black" points="162,-682.5 162,-768.5 308,-768.5 308,-682.5 162,-682.5"/>
<text xml:space="preserve" text-anchor="middle" x="235" y="-749.3" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">Reverse Proxy</text>
</g>
<g id="clust3" class="cluster">
<title>cluster_apps</title>
<polygon fill="#f0f8e8" stroke="black" points="8,-418.75 8,-652.5 290,-652.5 290,-418.75 8,-418.75"/>
<text xml:space="preserve" text-anchor="middle" x="149" y="-633.3" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">Application Layer</text>
</g>
<g id="clust4" class="cluster">
<title>cluster_data</title>
<polygon fill="#f8e8f0" stroke="black" points="27,-248.91 27,-350.84 141,-350.84 141,-248.91 27,-248.91"/>
<text xml:space="preserve" text-anchor="middle" x="84" y="-331.64" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">Data Layer</text>
</g>
<g id="clust5" class="cluster">
<title>cluster_gcp</title>
<polygon fill="#e8f0fd" stroke="black" points="299,-8 299,-351.5 631,-351.5 631,-8 299,-8"/>
<text xml:space="preserve" text-anchor="middle" x="465" y="-332.3" font-family="Helvetica,sans-Serif" font-weight="bold" font-size="16.00">Google Cloud</text>
</g>
<!-- browser -->
<g id="node1" class="node">
<title>browser</title>
<ellipse fill="none" stroke="black" cx="235" cy="-848.3" rx="71.77" ry="30.05"/>
<text xml:space="preserve" text-anchor="middle" x="235" y="-852.25" font-family="Helvetica,sans-Serif" font-size="14.00">Browser</text>
<text xml:space="preserve" text-anchor="middle" x="235" y="-835" font-family="Helvetica,sans-Serif" font-size="14.00">mpr.mcrn.ar</text>
</g>
<!-- nginx -->
<g id="node2" class="node">
<title>nginx</title>
<path fill="none" stroke="black" d="M256.5,-733C256.5,-733 213.5,-733 213.5,-733 207.5,-733 201.5,-727 201.5,-721 201.5,-721 201.5,-702.5 201.5,-702.5 201.5,-696.5 207.5,-690.5 213.5,-690.5 213.5,-690.5 256.5,-690.5 256.5,-690.5 262.5,-690.5 268.5,-696.5 268.5,-702.5 268.5,-702.5 268.5,-721 268.5,-721 268.5,-727 262.5,-733 256.5,-733"/>
<text xml:space="preserve" text-anchor="middle" x="235" y="-715.7" font-family="Helvetica,sans-Serif" font-size="14.00">nginx</text>
<text xml:space="preserve" text-anchor="middle" x="235" y="-698.45" font-family="Helvetica,sans-Serif" font-size="14.00">port 80</text>
</g>
<!-- browser&#45;&gt;nginx -->
<g id="edge1" class="edge">
<title>browser&#45;&gt;nginx</title>
<path fill="none" stroke="black" d="M235,-818C235,-818 235,-745 235,-745"/>
<polygon fill="black" stroke="black" points="238.5,-745 235,-735 231.5,-745 238.5,-745"/>
<text xml:space="preserve" text-anchor="middle" x="247.75" y="-779.75" font-family="Helvetica,sans-Serif" font-size="10.00">HTTP</text>
</g>
<!-- django -->
<g id="node3" class="node">
<title>django</title>
<path fill="none" stroke="black" d="M117.75,-617C117.75,-617 28.25,-617 28.25,-617 22.25,-617 16.25,-611 16.25,-605 16.25,-605 16.25,-569.25 16.25,-569.25 16.25,-563.25 22.25,-557.25 28.25,-557.25 28.25,-557.25 117.75,-557.25 117.75,-557.25 123.75,-557.25 129.75,-563.25 129.75,-569.25 129.75,-569.25 129.75,-605 129.75,-605 129.75,-611 123.75,-617 117.75,-617"/>
<text xml:space="preserve" text-anchor="middle" x="73" y="-599.7" font-family="Helvetica,sans-Serif" font-size="14.00">Django Admin</text>
<text xml:space="preserve" text-anchor="middle" x="73" y="-582.45" font-family="Helvetica,sans-Serif" font-size="14.00">/admin</text>
<text xml:space="preserve" text-anchor="middle" x="73" y="-565.2" font-family="Helvetica,sans-Serif" font-size="14.00">port 8701</text>
</g>
<!-- nginx&#45;&gt;django -->
<g id="edge2" class="edge">
<title>nginx&#45;&gt;django</title>
<path fill="none" stroke="black" d="M201.04,-719C153.54,-719 73,-719 73,-719 73,-719 73,-628.89 73,-628.89"/>
<polygon fill="black" stroke="black" points="76.5,-628.89 73,-618.89 69.5,-628.89 76.5,-628.89"/>
<text xml:space="preserve" text-anchor="middle" x="75.09" y="-722.25" font-family="Helvetica,sans-Serif" font-size="10.00">/admin</text>
</g>
<!-- fastapi -->
<g id="node4" class="node">
<title>fastapi</title>
<path fill="none" stroke="black" d="M270.25,-486.5C270.25,-486.5 189.75,-486.5 189.75,-486.5 183.75,-486.5 177.75,-480.5 177.75,-474.5 177.75,-474.5 177.75,-438.75 177.75,-438.75 177.75,-432.75 183.75,-426.75 189.75,-426.75 189.75,-426.75 270.25,-426.75 270.25,-426.75 276.25,-426.75 282.25,-432.75 282.25,-438.75 282.25,-438.75 282.25,-474.5 282.25,-474.5 282.25,-480.5 276.25,-486.5 270.25,-486.5"/>
<text xml:space="preserve" text-anchor="middle" x="230" y="-469.2" font-family="Helvetica,sans-Serif" font-size="14.00">GraphQL API</text>
<text xml:space="preserve" text-anchor="middle" x="230" y="-451.95" font-family="Helvetica,sans-Serif" font-size="14.00">/graphql</text>
<text xml:space="preserve" text-anchor="middle" x="230" y="-434.7" font-family="Helvetica,sans-Serif" font-size="14.00">port 8702</text>
</g>
<!-- nginx&#45;&gt;fastapi -->
<g id="edge3" class="edge">
<title>nginx&#45;&gt;fastapi</title>
<path fill="none" stroke="black" d="M201.11,-705C191.15,-705 182.88,-705 182.88,-705 182.88,-705 182.88,-498.1 182.88,-498.1"/>
<polygon fill="black" stroke="black" points="186.38,-498.1 182.88,-488.1 179.38,-498.1 186.38,-498.1"/>
<text xml:space="preserve" text-anchor="middle" x="163" y="-613.91" font-family="Helvetica,sans-Serif" font-size="10.00">/graphql</text>
</g>
<!-- timeline -->
<g id="node5" class="node">
<title>timeline</title>
<path fill="none" stroke="black" d="M270,-617C270,-617 200,-617 200,-617 194,-617 188,-611 188,-605 188,-605 188,-569.25 188,-569.25 188,-563.25 194,-557.25 200,-557.25 200,-557.25 270,-557.25 270,-557.25 276,-557.25 282,-563.25 282,-569.25 282,-569.25 282,-605 282,-605 282,-611 276,-617 270,-617"/>
<text xml:space="preserve" text-anchor="middle" x="235" y="-599.7" font-family="Helvetica,sans-Serif" font-size="14.00">Timeline UI</text>
<text xml:space="preserve" text-anchor="middle" x="235" y="-582.45" font-family="Helvetica,sans-Serif" font-size="14.00">/</text>
<text xml:space="preserve" text-anchor="middle" x="235" y="-565.2" font-family="Helvetica,sans-Serif" font-size="14.00">port 5173</text>
</g>
<!-- nginx&#45;&gt;timeline -->
<g id="edge4" class="edge">
<title>nginx&#45;&gt;timeline</title>
<path fill="none" stroke="black" d="M235,-690.04C235,-690.04 235,-628.97 235,-628.97"/>
<polygon fill="black" stroke="black" points="238.5,-628.97 235,-618.97 231.5,-628.97 238.5,-628.97"/>
<text xml:space="preserve" text-anchor="middle" x="233.5" y="-662.75" font-family="Helvetica,sans-Serif" font-size="10.00">/</text>
</g>
<!-- postgres -->
<g id="node6" class="node">
<title>postgres</title>
<path fill="none" stroke="black" d="M131.75,-310.03C131.75,-312.96 110.35,-315.34 84,-315.34 57.65,-315.34 36.25,-312.96 36.25,-310.03 36.25,-310.03 36.25,-262.22 36.25,-262.22 36.25,-259.29 57.65,-256.91 84,-256.91 110.35,-256.91 131.75,-259.29 131.75,-262.22 131.75,-262.22 131.75,-310.03 131.75,-310.03"/>
<path fill="none" stroke="black" d="M131.75,-310.03C131.75,-307.1 110.35,-304.72 84,-304.72 57.65,-304.72 36.25,-307.1 36.25,-310.03"/>
<text xml:space="preserve" text-anchor="middle" x="84" y="-290.07" font-family="Helvetica,sans-Serif" font-size="14.00">PostgreSQL</text>
<text xml:space="preserve" text-anchor="middle" x="84" y="-272.82" font-family="Helvetica,sans-Serif" font-size="14.00">port 5436</text>
</g>
<!-- django&#45;&gt;postgres -->
<g id="edge6" class="edge">
<title>django&#45;&gt;postgres</title>
<path fill="none" stroke="black" d="M59.62,-556.89C59.62,-556.89 59.62,-326.97 59.62,-326.97"/>
<polygon fill="black" stroke="black" points="63.13,-326.97 59.63,-316.97 56.13,-326.97 63.13,-326.97"/>
</g>
<!-- fastapi&#45;&gt;postgres -->
<g id="edge7" class="edge">
<title>fastapi&#45;&gt;postgres</title>
<path fill="none" stroke="black" d="M177.34,-467C135.16,-467 83,-467 83,-467 83,-467 83,-327.1 83,-327.1"/>
<polygon fill="black" stroke="black" points="86.5,-327.1 83,-317.1 79.5,-327.1 86.5,-327.1"/>
<text xml:space="preserve" text-anchor="middle" x="266.38" y="-375.5" font-family="Helvetica,sans-Serif" font-size="10.00">read/write jobs</text>
</g>
<!-- fastapi&#45;&gt;postgres -->
<g id="edge11" class="edge">
<title>fastapi&#45;&gt;postgres</title>
<path fill="none" stroke="black" d="M177.57,-447C143.88,-447 106.38,-447 106.38,-447 106.38,-447 106.38,-327.15 106.38,-327.15"/>
<polygon fill="black" stroke="black" points="109.88,-327.15 106.38,-317.15 102.88,-327.15 109.88,-327.15"/>
<text xml:space="preserve" text-anchor="middle" x="125.25" y="-381.88" font-family="Helvetica,sans-Serif" font-size="10.00">callback updates</text>
<text xml:space="preserve" text-anchor="middle" x="125.25" y="-369.12" font-family="Helvetica,sans-Serif" font-size="10.00">job status</text>
</g>
<!-- cloud_run_job -->
<g id="node7" class="node">
<title>cloud_run_job</title>
<path fill="none" stroke="black" d="M505,-316C505,-316 387,-316 387,-316 381,-316 375,-310 375,-304 375,-304 375,-268.25 375,-268.25 375,-262.25 381,-256.25 387,-256.25 387,-256.25 505,-256.25 505,-256.25 511,-256.25 517,-262.25 517,-268.25 517,-268.25 517,-304 517,-304 517,-310 511,-316 505,-316"/>
<text xml:space="preserve" text-anchor="middle" x="446" y="-298.7" font-family="Helvetica,sans-Serif" font-size="14.00">Cloud Run Job</text>
<text xml:space="preserve" text-anchor="middle" x="446" y="-281.45" font-family="Helvetica,sans-Serif" font-size="14.00">FFmpeg container</text>
<text xml:space="preserve" text-anchor="middle" x="446" y="-264.2" font-family="Helvetica,sans-Serif" font-size="14.00">transcoding</text>
</g>
<!-- fastapi&#45;&gt;cloud_run_job -->
<g id="edge8" class="edge">
<title>fastapi&#45;&gt;cloud_run_job</title>
<path fill="none" stroke="black" d="M247.42,-426.41C247.42,-379.88 247.42,-296 247.42,-296 247.42,-296 363.07,-296 363.07,-296"/>
<polygon fill="black" stroke="black" points="363.07,-299.5 373.07,-296 363.07,-292.5 363.07,-299.5"/>
<text xml:space="preserve" text-anchor="middle" x="414.38" y="-388.25" font-family="Helvetica,sans-Serif" font-size="10.00">google&#45;cloud&#45;run</text>
<text xml:space="preserve" text-anchor="middle" x="414.38" y="-375.5" font-family="Helvetica,sans-Serif" font-size="10.00">run_job() + payload</text>
<text xml:space="preserve" text-anchor="middle" x="414.38" y="-362.75" font-family="Helvetica,sans-Serif" font-size="10.00">execution_name</text>
</g>
<!-- timeline&#45;&gt;fastapi -->
<g id="edge5" class="edge">
<title>timeline&#45;&gt;fastapi</title>
<path fill="none" stroke="black" d="M235,-556.86C235,-556.86 235,-498.24 235,-498.24"/>
<polygon fill="black" stroke="black" points="238.5,-498.24 235,-488.24 231.5,-498.24 238.5,-498.24"/>
<text xml:space="preserve" text-anchor="middle" x="253" y="-518.75" font-family="Helvetica,sans-Serif" font-size="10.00">GraphQL</text>
</g>
<!-- cloud_run_job&#45;&gt;fastapi -->
<g id="edge10" class="edge">
<title>cloud_run_job&#45;&gt;fastapi</title>
<path fill="none" stroke="black" d="M374.7,-276C306.06,-276 212.58,-276 212.58,-276 212.58,-276 212.58,-414.88 212.58,-414.88"/>
<polygon fill="black" stroke="black" points="209.08,-414.88 212.58,-424.88 216.08,-414.88 209.08,-414.88"/>
<text xml:space="preserve" text-anchor="middle" x="585.62" y="-381.88" font-family="Helvetica,sans-Serif" font-size="10.00">POST /jobs/{id}/callback</text>
<text xml:space="preserve" text-anchor="middle" x="585.62" y="-369.12" font-family="Helvetica,sans-Serif" font-size="10.00">update status</text>
</g>
<!-- gcs -->
<g id="node8" class="node">
<title>gcs</title>
<polygon fill="none" stroke="black" points="510.25,-160 507.25,-164 486.25,-164 483.25,-160 381.75,-160 381.75,-117.5 510.25,-117.5 510.25,-160"/>
<text xml:space="preserve" text-anchor="middle" x="446" y="-142.7" font-family="Helvetica,sans-Serif" font-size="14.00">GCS Buckets</text>
<text xml:space="preserve" text-anchor="middle" x="446" y="-125.45" font-family="Helvetica,sans-Serif" font-size="14.00">(S3&#45;compat API)</text>
</g>
<!-- cloud_run_job&#45;&gt;gcs -->
<g id="edge9" class="edge">
<title>cloud_run_job&#45;&gt;gcs</title>
<path fill="none" stroke="black" d="M446,-255.95C446,-255.95 446,-171.81 446,-171.81"/>
<polygon fill="black" stroke="black" points="449.5,-171.81 446,-161.81 442.5,-171.81 449.5,-171.81"/>
<text xml:space="preserve" text-anchor="middle" x="492.12" y="-217.75" font-family="Helvetica,sans-Serif" font-size="10.00">S3 compat (HMAC)</text>
<text xml:space="preserve" text-anchor="middle" x="492.12" y="-205" font-family="Helvetica,sans-Serif" font-size="10.00">download input</text>
<text xml:space="preserve" text-anchor="middle" x="492.12" y="-192.25" font-family="Helvetica,sans-Serif" font-size="10.00">upload output</text>
</g>
<!-- bucket_in -->
<g id="node9" class="node">
<title>bucket_in</title>
<polygon fill="none" stroke="black" points="414.75,-58.5 307.25,-58.5 307.25,-16 420.75,-16 420.75,-52.5 414.75,-58.5"/>
<polyline fill="none" stroke="black" points="414.75,-58.5 414.75,-52.5"/>
<polyline fill="none" stroke="black" points="420.75,-52.5 414.75,-52.5"/>
<text xml:space="preserve" text-anchor="middle" x="364" y="-41.2" font-family="Helvetica,sans-Serif" font-size="14.00">mpr&#45;media&#45;in/</text>
<text xml:space="preserve" text-anchor="middle" x="364" y="-23.95" font-family="Helvetica,sans-Serif" font-size="14.00">input videos</text>
</g>
<!-- gcs&#45;&gt;bucket_in -->
<g id="edge12" class="edge">
<title>gcs&#45;&gt;bucket_in</title>
<path fill="none" stroke="black" stroke-dasharray="1,5" d="M401.25,-117.22C401.25,-100 401.25,-75.96 401.25,-58.74"/>
</g>
<!-- bucket_out -->
<g id="node10" class="node">
<title>bucket_out</title>
<polygon fill="none" stroke="black" points="617.12,-58.5 478.88,-58.5 478.88,-16 623.12,-16 623.12,-52.5 617.12,-58.5"/>
<polyline fill="none" stroke="black" points="617.12,-58.5 617.12,-52.5"/>
<polyline fill="none" stroke="black" points="623.12,-52.5 617.12,-52.5"/>
<text xml:space="preserve" text-anchor="middle" x="551" y="-41.2" font-family="Helvetica,sans-Serif" font-size="14.00">mpr&#45;media&#45;out/</text>
<text xml:space="preserve" text-anchor="middle" x="551" y="-23.95" font-family="Helvetica,sans-Serif" font-size="14.00">transcoded output</text>
</g>
<!-- gcs&#45;&gt;bucket_out -->
<g id="edge13" class="edge">
<title>gcs&#45;&gt;bucket_out</title>
<path fill="none" stroke="black" stroke-dasharray="1,5" d="M494.56,-117.22C494.56,-100 494.56,-75.96 494.56,-58.74"/>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 15 KiB

View File

@@ -68,9 +68,50 @@ aws s3 cp video.mp4 s3://mpr-media-in/
aws s3 sync /local/media/ s3://mpr-media-in/
```
## GCP Production (GCS via S3 compatibility)
GCS exposes an S3-compatible API. The same `core/storage/s3.py` boto3 code works
with no changes — only the endpoint and credentials differ.
### GCS HMAC Keys
Generate under **Cloud Storage → Settings → Interoperability** in the GCP console.
These act as `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`.
### Configuration
```bash
S3_ENDPOINT_URL=https://storage.googleapis.com
S3_BUCKET_IN=mpr-media-in
S3_BUCKET_OUT=mpr-media-out
AWS_ACCESS_KEY_ID=<GCS HMAC access key>
AWS_SECRET_ACCESS_KEY=<GCS HMAC secret>
# Executor
MPR_EXECUTOR=gcp
GCP_PROJECT_ID=my-project
GCP_REGION=us-central1
CLOUD_RUN_JOB=mpr-transcode
CALLBACK_URL=https://mpr.mcrn.ar/api
CALLBACK_API_KEY=<secret>
```
### Upload Files to GCS
```bash
gcloud storage cp video.mp4 gs://mpr-media-in/
# Or with the aws CLI via compat endpoint
aws --endpoint-url https://storage.googleapis.com s3 cp video.mp4 s3://mpr-media-in/
```
### Cloud Run Job Handler
`core/task/gcp_handler.py` is the Cloud Run Job entrypoint. It reads the job payload
from `MPR_JOB_PAYLOAD` (injected by `GCPExecutor`), uses `core/storage` for all
GCS access (S3 compat), and POSTs the completion callback to the API.
Set the Cloud Run Job command to: `python -m core.task.gcp_handler`
## Storage Module
`core/storage.py` provides all S3 operations:
`core/storage/` package provides all S3 operations:
```python
from core.storage import (
@@ -114,7 +155,14 @@ mutation { scanMediaFolder { found registered skipped files } }
4. Uploads result to `S3_BUCKET_OUT`
5. Calls back to API with result
Both paths use the same S3 buckets and key structure.
### Cloud Run Job Mode (GCP)
1. `GCPExecutor` triggers Cloud Run Job with payload in `MPR_JOB_PAYLOAD`
2. `core/task/gcp_handler.py` downloads source from `S3_BUCKET_IN` (GCS S3 compat)
3. Runs FFmpeg in container
4. Uploads result to `S3_BUCKET_OUT` (GCS S3 compat)
5. Calls back to API with result
All three paths use the same S3-compatible bucket names and key structure.
## Supported File Types

View File

@@ -9,8 +9,9 @@
<body>
<h1>MPR - Media Processor</h1>
<p>
Media transcoding platform with dual execution modes: local (Celery
+ MinIO) and cloud (AWS Step Functions + Lambda + S3).
Media transcoding platform with three execution modes: local (Celery
+ MinIO), AWS (Step Functions + Lambda + S3), and GCP (Cloud Run
Jobs + GCS). Storage is S3-compatible across all environments.
</p>
<nav>
@@ -54,6 +55,21 @@
>Open full size</a
>
</div>
<div class="diagram">
<h3>GCP Architecture (Production)</h3>
<object
type="image/svg+xml"
data="architecture/01c-gcp-architecture.svg"
>
<img
src="architecture/01c-gcp-architecture.svg"
alt="GCP Architecture"
/>
</object>
<a href="architecture/01c-gcp-architecture.svg" target="_blank"
>Open full size</a
>
</div>
</div>
<div class="legend">
@@ -73,7 +89,11 @@
</li>
<li>
<span class="color-box" style="background: #fde8d0"></span>
AWS (Step Functions, Lambda - cloud mode)
AWS (Step Functions, Lambda)
</li>
<li>
<span class="color-box" style="background: #e8f0fd"></span>
GCP (Cloud Run Jobs + GCS)
</li>
<li>
<span class="color-box" style="background: #f8e8f0"></span>
@@ -81,7 +101,7 @@
</li>
<li>
<span class="color-box" style="background: #f0f0f0"></span>
S3 Storage (MinIO local / AWS S3 cloud)
S3-compatible Storage (MinIO / AWS S3 / GCS)
</li>
</ul>
</div>

View File

@@ -7,17 +7,17 @@ Supported generators:
- TypeScriptGenerator: TypeScript interfaces
- ProtobufGenerator: Protocol Buffer definitions
- PrismaGenerator: Prisma schema
- GrapheneGenerator: Graphene ObjectType/InputObjectType classes
- StrawberryGenerator: Strawberry type/input/enum classes
"""
from typing import Dict, Type
from .base import BaseGenerator
from .django import DjangoGenerator
from .graphene import GrapheneGenerator
from .prisma import PrismaGenerator
from .protobuf import ProtobufGenerator
from .pydantic import PydanticGenerator
from .strawberry import StrawberryGenerator
from .typescript import TypeScriptGenerator
# Registry of available generators
@@ -29,14 +29,14 @@ GENERATORS: Dict[str, Type[BaseGenerator]] = {
"protobuf": ProtobufGenerator,
"proto": ProtobufGenerator, # Alias
"prisma": PrismaGenerator,
"graphene": GrapheneGenerator,
"strawberry": StrawberryGenerator,
}
__all__ = [
"BaseGenerator",
"PydanticGenerator",
"DjangoGenerator",
"GrapheneGenerator",
"StrawberryGenerator",
"TypeScriptGenerator",
"ProtobufGenerator",
"PrismaGenerator",

View File

@@ -1,28 +1,29 @@
"""
Graphene Generator
Strawberry Generator
Generates graphene ObjectType and InputObjectType classes from model definitions.
Generates strawberry type, input, and enum classes from model definitions.
Only generates type definitions queries, mutations, and resolvers are hand-written.
"""
import dataclasses as dc
from enum import Enum
from pathlib import Path
from typing import Any, List, get_type_hints
from ..helpers import get_origin_name, get_type_name, unwrap_optional
from ..loader.schema import EnumDefinition, FieldDefinition, ModelDefinition
from ..types import GRAPHENE_RESOLVERS
from ..types import STRAWBERRY_RESOLVERS
from .base import BaseGenerator
class GrapheneGenerator(BaseGenerator):
"""Generates graphene type definition files."""
class StrawberryGenerator(BaseGenerator):
"""Generates strawberry type definition files."""
def file_extension(self) -> str:
return ".py"
def generate(self, models, output_path: Path) -> None:
"""Generate graphene types to output_path."""
"""Generate strawberry types to output_path."""
output_path.parent.mkdir(parents=True, exist_ok=True)
if hasattr(models, "models"):
@@ -47,22 +48,18 @@ class GrapheneGenerator(BaseGenerator):
enums: List[EnumDefinition],
api_models: List[ModelDefinition],
) -> str:
"""Generate from ModelDefinition objects."""
lines = self._generate_header()
# Generate enums as graphene.Enum
for enum_def in enums:
lines.extend(self._generate_enum(enum_def))
lines.append("")
lines.append("")
# Generate domain models as ObjectType
for model_def in models:
lines.extend(self._generate_object_type(model_def))
lines.append("")
lines.append("")
# Generate API models — request types as InputObjectType, others as ObjectType
for model_def in api_models:
if model_def.name.endswith("Request"):
lines.extend(self._generate_input_type(model_def))
@@ -74,7 +71,6 @@ class GrapheneGenerator(BaseGenerator):
return "\n".join(lines).rstrip() + "\n"
def _generate_from_dataclasses(self, dataclasses: List[type]) -> str:
"""Generate from Python dataclasses."""
lines = self._generate_header()
enums_generated = set()
@@ -99,37 +95,38 @@ class GrapheneGenerator(BaseGenerator):
def _generate_header(self) -> List[str]:
return [
'"""',
"Graphene Types - GENERATED FILE",
"Strawberry Types - GENERATED FILE",
"",
"Do not edit directly. Regenerate using modelgen.",
'"""',
"",
"import graphene",
"import strawberry",
"from enum import Enum",
"from typing import List, Optional",
"from uuid import UUID",
"from datetime import datetime",
"from strawberry.scalars import JSON",
"",
"",
]
def _generate_enum(self, enum_def: EnumDefinition) -> List[str]:
"""Generate graphene.Enum from EnumDefinition."""
lines = [f"class {enum_def.name}(graphene.Enum):"]
lines = ["@strawberry.enum", f"class {enum_def.name}(Enum):"]
for name, value in enum_def.values:
lines.append(f' {name} = "{value}"')
return lines
def _generate_enum_from_python(self, enum_cls: type) -> List[str]:
"""Generate graphene.Enum from Python Enum."""
lines = [f"class {enum_cls.__name__}(graphene.Enum):"]
lines = ["@strawberry.enum", f"class {enum_cls.__name__}(Enum):"]
for member in enum_cls:
lines.append(f' {member.name} = "{member.value}"')
return lines
def _generate_object_type(self, model_def: ModelDefinition) -> List[str]:
"""Generate graphene.ObjectType from ModelDefinition."""
name = model_def.name
# Append Type suffix if not already present
type_name = f"{name}Type" if not name.endswith("Type") else name
lines = [f"class {type_name}(graphene.ObjectType):"]
lines = ["@strawberry.type", f"class {type_name}:"]
if model_def.docstring:
doc = model_def.docstring.strip().split("\n")[0]
lines.append(f' """{doc}"""')
@@ -139,23 +136,19 @@ class GrapheneGenerator(BaseGenerator):
lines.append(" pass")
else:
for field in model_def.fields:
graphene_type = self._resolve_type(field.type_hint, field.optional)
lines.append(f" {field.name} = {graphene_type}")
type_str = self._resolve_type(field.type_hint, optional=True)
lines.append(f" {field.name}: {type_str} = None")
return lines
def _generate_input_type(self, model_def: ModelDefinition) -> List[str]:
"""Generate graphene.InputObjectType from ModelDefinition."""
import dataclasses as dc
name = model_def.name
# Convert FooRequest -> FooInput
if name.endswith("Request"):
input_name = name[: -len("Request")] + "Input"
else:
input_name = f"{name}Input"
lines = [f"class {input_name}(graphene.InputObjectType):"]
lines = ["@strawberry.input", f"class {input_name}:"]
if model_def.docstring:
doc = model_def.docstring.strip().split("\n")[0]
lines.append(f' """{doc}"""')
@@ -164,73 +157,64 @@ class GrapheneGenerator(BaseGenerator):
if not model_def.fields:
lines.append(" pass")
else:
# Required fields first, then optional/defaulted
required = []
optional = []
for field in model_def.fields:
graphene_type = self._resolve_type(field.type_hint, field.optional)
# Required only if not optional AND no default value
has_default = field.default is not dc.MISSING
if not field.optional and not has_default:
graphene_type = self._make_required(graphene_type)
elif has_default and not field.optional:
graphene_type = self._add_default(graphene_type, field.default)
lines.append(f" {field.name} = {graphene_type}")
required.append(field)
else:
optional.append(field)
for field in required:
type_str = self._resolve_type(field.type_hint, optional=False)
lines.append(f" {field.name}: {type_str}")
for field in optional:
has_default = field.default is not dc.MISSING
if has_default and not callable(field.default):
type_str = self._resolve_type(field.type_hint, optional=False)
lines.append(f" {field.name}: {type_str} = {field.default!r}")
else:
type_str = self._resolve_type(field.type_hint, optional=True)
lines.append(f" {field.name}: {type_str} = None")
return lines
def _generate_object_type_from_dataclass(self, cls: type) -> List[str]:
"""Generate graphene.ObjectType from a dataclass."""
import dataclasses as dc
type_name = f"{cls.__name__}Type"
lines = [f"class {type_name}(graphene.ObjectType):"]
lines = ["@strawberry.type", f"class {type_name}:"]
hints = get_type_hints(cls)
for name, type_hint in hints.items():
if name.startswith("_"):
continue
graphene_type = self._resolve_type(type_hint, False)
lines.append(f" {name} = {graphene_type}")
type_str = self._resolve_type(type_hint, optional=True)
lines.append(f" {name}: {type_str} = None")
return lines
def _resolve_type(self, type_hint: Any, optional: bool) -> str:
"""Resolve Python type to graphene field call string."""
"""Resolve Python type hint to a strawberry annotation string."""
base, is_optional = unwrap_optional(type_hint)
optional = optional or is_optional
origin = get_origin_name(base)
type_name = get_type_name(base)
# Look up resolver
resolver = (
GRAPHENE_RESOLVERS.get(origin)
or GRAPHENE_RESOLVERS.get(type_name)
or GRAPHENE_RESOLVERS.get(base)
STRAWBERRY_RESOLVERS.get(origin)
or STRAWBERRY_RESOLVERS.get(type_name)
or STRAWBERRY_RESOLVERS.get(base)
or (
GRAPHENE_RESOLVERS["enum"]
STRAWBERRY_RESOLVERS["enum"]
if isinstance(base, type) and issubclass(base, Enum)
else None
)
)
result = resolver(base) if resolver else "graphene.String"
inner = resolver(base) if resolver else "str"
# List types already have () syntax from resolver
if result.startswith("graphene.List("):
return result
# Scalar types: add () call
return f"{result}()"
def _make_required(self, field_str: str) -> str:
"""Add required=True to a graphene field."""
if field_str.endswith("()"):
return field_str[:-1] + "required=True)"
return field_str
def _add_default(self, field_str: str, default: Any) -> str:
"""Add default_value to a graphene field."""
if callable(default):
# default_factory — skip, graphene doesn't support factories
return field_str
if field_str.endswith("()"):
return field_str[:-1] + f"default_value={default!r})"
return field_str
if optional:
return f"Optional[{inner}]"
return inner

View File

@@ -139,34 +139,34 @@ PRISMA_SPECIAL: dict[str, str] = {
}
# =============================================================================
# Graphene Type Resolvers
# Strawberry Type Resolvers
# =============================================================================
def _resolve_graphene_list(base: Any) -> str:
"""Resolve graphene List type."""
def _resolve_strawberry_list(base: Any) -> str:
"""Resolve strawberry List type annotation."""
args = get_args(base)
if args:
inner = args[0]
if inner is str:
return "graphene.List(graphene.String)"
return "List[str]"
elif inner is int:
return "graphene.List(graphene.Int)"
return "List[int]"
elif inner is float:
return "graphene.List(graphene.Float)"
return "List[float]"
elif inner is bool:
return "graphene.List(graphene.Boolean)"
return "graphene.List(graphene.String)"
return "List[bool]"
return "List[str]"
GRAPHENE_RESOLVERS: dict[Any, Callable[[Any], str]] = {
str: lambda _: "graphene.String",
int: lambda _: "graphene.Int",
float: lambda _: "graphene.Float",
bool: lambda _: "graphene.Boolean",
"UUID": lambda _: "graphene.UUID",
"datetime": lambda _: "graphene.DateTime",
"dict": lambda _: "graphene.JSONString",
"list": _resolve_graphene_list,
"enum": lambda base: f"graphene.String", # Enums exposed as strings in GQL
STRAWBERRY_RESOLVERS: dict[Any, Callable[[Any], str]] = {
str: lambda _: "str",
int: lambda _: "int",
float: lambda _: "float",
bool: lambda _: "bool",
"UUID": lambda _: "UUID",
"datetime": lambda _: "datetime",
"dict": lambda _: "JSON",
"list": _resolve_strawberry_list,
"enum": lambda base: base.__name__,
}

2
requirements-worker.txt Normal file
View File

@@ -0,0 +1,2 @@
-r requirements.txt
ffmpeg-python>=0.2.0

View File

@@ -12,19 +12,19 @@ pydantic>=2.5.0
celery[redis]>=5.3.0
redis>=5.0.0
# FFmpeg
ffmpeg-python>=0.2.0
# gRPC
grpcio>=1.60.0
grpcio-tools>=1.60.0
# AWS
boto3>=1.34.0
requests>=2.31.0
# GCP (optional — only needed when MPR_EXECUTOR=gcp)
google-cloud-run>=0.10.0
# GraphQL
graphene>=3.3
starlette-graphene3>=0.6.0
strawberry-graphql[fastapi]>=0.311.0
# Testing
pytest>=7.4.0

View File

@@ -1,5 +1,5 @@
/**
* API client for FastAPI backend
* GraphQL API client
*/
import type {
@@ -8,34 +8,51 @@ import type {
TranscodeJob,
CreateJobRequest,
SystemStatus,
WorkerStatus,
} from "./types";
const API_BASE = "/api";
const GRAPHQL_URL = "/api/graphql";
async function request<T>(path: string, options?: RequestInit): Promise<T> {
const response = await fetch(`${API_BASE}${path}`, {
headers: {
"Content-Type": "application/json",
},
...options,
async function gql<T>(query: string, variables?: Record<string, unknown>): Promise<T> {
const response = await fetch(GRAPHQL_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query, variables }),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`API error: ${response.status} - ${error}`);
const json = await response.json();
if (json.errors?.length) {
throw new Error(json.errors[0].message);
}
return response.json();
return json.data as T;
}
// Assets
export async function getAssets(): Promise<MediaAsset[]> {
return request("/assets/");
const data = await gql<{ assets: MediaAsset[] }>(`
query {
assets {
id filename file_path status error_message file_size duration
video_codec audio_codec width height framerate bitrate
properties comments tags created_at updated_at
}
}
`);
return data.assets;
}
export async function getAsset(id: string): Promise<MediaAsset> {
return request(`/assets/${id}`);
const data = await gql<{ asset: MediaAsset }>(`
query($id: UUID!) {
asset(id: $id) {
id filename file_path status error_message file_size duration
video_codec audio_codec width height framerate bitrate
properties comments tags created_at updated_at
}
}
`, { id });
return data.asset;
}
export async function scanMediaFolder(): Promise<{
@@ -44,43 +61,95 @@ export async function scanMediaFolder(): Promise<{
skipped: number;
files: string[];
}> {
return request("/assets/scan", {
method: "POST",
});
const data = await gql<{ scan_media_folder: { found: number; registered: number; skipped: number; files: string[] } }>(`
mutation {
scan_media_folder { found registered skipped files }
}
`);
return data.scan_media_folder;
}
// Presets
export async function getPresets(): Promise<TranscodePreset[]> {
return request("/presets/");
const data = await gql<{ presets: TranscodePreset[] }>(`
query {
presets {
id name description is_builtin container
video_codec video_bitrate video_crf video_preset resolution framerate
audio_codec audio_bitrate audio_channels audio_samplerate
extra_args created_at updated_at
}
}
`);
return data.presets;
}
// Jobs
export async function getJobs(): Promise<TranscodeJob[]> {
return request("/jobs/");
const data = await gql<{ jobs: TranscodeJob[] }>(`
query {
jobs {
id source_asset_id preset_id preset_snapshot trim_start trim_end
output_filename output_path output_asset_id status progress
current_frame current_time speed error_message celery_task_id
execution_arn priority created_at started_at completed_at
}
}
`);
return data.jobs;
}
export async function getJob(id: string): Promise<TranscodeJob> {
return request(`/jobs/${id}`);
const data = await gql<{ job: TranscodeJob }>(`
query($id: UUID!) {
job(id: $id) {
id source_asset_id preset_id preset_snapshot trim_start trim_end
output_filename output_path output_asset_id status progress
current_frame current_time speed error_message celery_task_id
execution_arn priority created_at started_at completed_at
}
}
`, { id });
return data.job;
}
export async function createJob(data: CreateJobRequest): Promise<TranscodeJob> {
return request("/jobs/", {
method: "POST",
body: JSON.stringify(data),
export async function createJob(req: CreateJobRequest): Promise<TranscodeJob> {
const data = await gql<{ create_job: TranscodeJob }>(`
mutation($input: CreateJobInput!) {
create_job(input: $input) {
id source_asset_id status output_filename progress created_at
}
}
`, {
input: {
source_asset_id: req.source_asset_id,
preset_id: req.preset_id,
trim_start: req.trim_start,
trim_end: req.trim_end,
output_filename: req.output_filename ?? null,
priority: req.priority ?? 0,
},
});
return data.create_job;
}
export async function cancelJob(id: string): Promise<TranscodeJob> {
return request(`/jobs/${id}/cancel`, {
method: "POST",
});
const data = await gql<{ cancel_job: TranscodeJob }>(`
mutation($id: UUID!) {
cancel_job(id: $id) {
id status
}
}
`, { id });
return data.cancel_job;
}
// System
export async function getSystemStatus(): Promise<SystemStatus> {
return request("/system/status");
const data = await gql<{ system_status: SystemStatus }>(`
query {
system_status { status version }
}
export async function getWorkerStatus(): Promise<WorkerStatus> {
return request("/system/worker");
`);
return data.system_status;
}