Remove REST API, keep GraphQL as sole API

- Add missing GraphQL mutations: retryJob, updateAsset, deleteAsset
- Add UpdateAssetRequest and DeleteResult to schema source of truth
- Move Lambda callback endpoint to main.py (only REST endpoint)
- Remove REST routes, pydantic schemas, and deps
- Remove pydantic target from modelgen.json
- Update architecture diagrams and documentation
This commit is contained in:
2026-02-12 20:07:51 -03:00
parent dbbaad5b94
commit 4e9d731cff
24 changed files with 393 additions and 1031 deletions

View File

@@ -1,54 +0,0 @@
"""
FastAPI dependencies.
Provides database sessions, settings, and common dependencies.
"""
import os
from functools import lru_cache
from typing import Generator
import django
from django.conf import settings as django_settings
# Initialize Django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mpr.settings")
django.setup()
from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset
@lru_cache
def get_settings():
"""Get Django settings."""
return django_settings
def get_asset(asset_id: str) -> MediaAsset:
"""Get asset by ID or raise 404."""
from fastapi import HTTPException
try:
return MediaAsset.objects.get(id=asset_id)
except MediaAsset.DoesNotExist:
raise HTTPException(status_code=404, detail="Asset not found")
def get_preset(preset_id: str) -> TranscodePreset:
"""Get preset by ID or raise 404."""
from fastapi import HTTPException
try:
return TranscodePreset.objects.get(id=preset_id)
except TranscodePreset.DoesNotExist:
raise HTTPException(status_code=404, detail="Preset not found")
def get_job(job_id: str) -> TranscodeJob:
"""Get job by ID or raise 404."""
from fastapi import HTTPException
try:
return TranscodeJob.objects.get(id=job_id)
except TranscodeJob.DoesNotExist:
raise HTTPException(status_code=404, detail="Job not found")

View File

@@ -1,7 +1,7 @@
"""
GraphQL API using graphene, mounted on FastAPI/Starlette.
Provides the same data as the REST API but via GraphQL queries and mutations.
Primary API for MPR — all client interactions go through GraphQL.
Uses Django ORM directly for data access.
Types are generated from schema/ via modelgen — see api/schema/graphql.py.
"""
@@ -12,11 +12,13 @@ import graphene
from api.schema.graphql import (
CreateJobInput,
DeleteResultType,
MediaAssetType,
ScanResultType,
SystemStatusType,
TranscodeJobType,
TranscodePresetType,
UpdateAssetInput,
)
from core.storage import BUCKET_IN, list_objects
@@ -238,10 +240,83 @@ class CancelJob(graphene.Mutation):
return job
class RetryJob(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
Output = TranscodeJobType
def mutate(self, info, id):
from mpr.media_assets.models import TranscodeJob
try:
job = TranscodeJob.objects.get(id=id)
except TranscodeJob.DoesNotExist:
raise Exception("Job not found")
if job.status != "failed":
raise Exception("Only failed jobs can be retried")
job.status = "pending"
job.progress = 0
job.error_message = None
job.save(update_fields=["status", "progress", "error_message"])
return job
class UpdateAsset(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
input = UpdateAssetInput(required=True)
Output = MediaAssetType
def mutate(self, info, id, input):
from mpr.media_assets.models import MediaAsset
try:
asset = MediaAsset.objects.get(id=id)
except MediaAsset.DoesNotExist:
raise Exception("Asset not found")
update_fields = []
if input.comments is not None:
asset.comments = input.comments
update_fields.append("comments")
if input.tags is not None:
asset.tags = input.tags
update_fields.append("tags")
if update_fields:
asset.save(update_fields=update_fields)
return asset
class DeleteAsset(graphene.Mutation):
class Arguments:
id = graphene.UUID(required=True)
Output = DeleteResultType
def mutate(self, info, id):
from mpr.media_assets.models import MediaAsset
try:
asset = MediaAsset.objects.get(id=id)
asset.delete()
return DeleteResultType(ok=True)
except MediaAsset.DoesNotExist:
raise Exception("Asset not found")
class Mutation(graphene.ObjectType):
scan_media_folder = ScanMediaFolder.Field()
create_job = CreateJob.Field()
cancel_job = CancelJob.Field()
retry_job = RetryJob.Field()
update_asset = UpdateAsset.Field()
delete_asset = DeleteAsset.Field()
# ---------------------------------------------------------------------------

View File

@@ -1,11 +1,13 @@
"""
MPR FastAPI Application
Main entry point for the REST API.
Serves GraphQL API and Lambda callback endpoint.
"""
import os
import sys
from typing import Optional
from uuid import UUID
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -17,16 +19,17 @@ import django
django.setup()
from fastapi import FastAPI
from fastapi import FastAPI, Header, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from starlette_graphene3 import GraphQLApp, make_graphiql_handler
from api.graphql import schema as graphql_schema
from api.routes import assets_router, jobs_router, presets_router, system_router
from starlette_graphene3 import GraphQLApp, make_graphiql_handler
CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "")
app = FastAPI(
title="MPR API",
description="Media Processor REST API",
description="Media Processor — GraphQL API",
version="0.1.0",
docs_url="/docs",
redoc_url="/redoc",
@@ -41,12 +44,6 @@ app.add_middleware(
allow_headers=["*"],
)
# Routes - all under /api prefix
app.include_router(system_router, prefix="/api")
app.include_router(assets_router, prefix="/api")
app.include_router(presets_router, prefix="/api")
app.include_router(jobs_router, prefix="/api")
# GraphQL
app.mount("/graphql", GraphQLApp(schema=graphql_schema, on_get=make_graphiql_handler()))
@@ -57,5 +54,45 @@ def root():
return {
"name": "MPR API",
"version": "0.1.0",
"docs": "/docs",
"graphql": "/graphql",
}
@app.post("/api/jobs/{job_id}/callback")
def job_callback(
job_id: UUID,
payload: dict,
x_api_key: Optional[str] = Header(None),
):
"""
Callback endpoint for Lambda to report job completion.
Protected by API key.
"""
if CALLBACK_API_KEY and x_api_key != CALLBACK_API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
from django.utils import timezone
from mpr.media_assets.models import TranscodeJob
try:
job = TranscodeJob.objects.get(id=job_id)
except TranscodeJob.DoesNotExist:
raise HTTPException(status_code=404, detail="Job not found")
status = payload.get("status", "failed")
job.status = status
job.progress = 100.0 if status == "completed" else job.progress
update_fields = ["status", "progress"]
if payload.get("error"):
job.error_message = payload["error"]
update_fields.append("error_message")
if status in ("completed", "failed"):
job.completed_at = timezone.now()
update_fields.append("completed_at")
job.save(update_fields=update_fields)
return {"ok": True}

View File

@@ -1,8 +0,0 @@
"""API Routes."""
from .assets import router as assets_router
from .jobs import router as jobs_router
from .presets import router as presets_router
from .system import router as system_router
__all__ = ["assets_router", "jobs_router", "presets_router", "system_router"]

View File

@@ -1,117 +0,0 @@
"""
Asset endpoints - media file registration and metadata.
"""
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from api.deps import get_asset
from api.schema import AssetCreate, AssetResponse, AssetUpdate
from core.storage import BUCKET_IN, list_objects
router = APIRouter(prefix="/assets", tags=["assets"])
# Supported media extensions
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"}
AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}
MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS
@router.post("/", response_model=AssetResponse, status_code=201)
def create_asset(data: AssetCreate):
"""Register a media file as an asset."""
from mpr.media_assets.models import MediaAsset
asset = MediaAsset.objects.create(
filename=data.filename or data.file_path.split("/")[-1],
file_path=data.file_path,
file_size=data.file_size,
)
return asset
@router.get("/", response_model=list[AssetResponse])
def list_assets(
status: Optional[str] = Query(None, description="Filter by status"),
limit: int = Query(50, ge=1, le=100),
offset: int = Query(0, ge=0),
):
"""List assets with optional filtering."""
from mpr.media_assets.models import MediaAsset
qs = MediaAsset.objects.all()
if status:
qs = qs.filter(status=status)
return list(qs[offset : offset + limit])
@router.get("/{asset_id}", response_model=AssetResponse)
def get_asset_detail(asset_id: UUID, asset=Depends(get_asset)):
"""Get asset details."""
return asset
@router.patch("/{asset_id}", response_model=AssetResponse)
def update_asset(asset_id: UUID, data: AssetUpdate, asset=Depends(get_asset)):
"""Update asset metadata (comments, tags)."""
update_fields = []
if data.comments is not None:
asset.comments = data.comments
update_fields.append("comments")
if data.tags is not None:
asset.tags = data.tags
update_fields.append("tags")
if update_fields:
asset.save(update_fields=update_fields)
return asset
@router.delete("/{asset_id}", status_code=204)
def delete_asset(asset_id: UUID, asset=Depends(get_asset)):
"""Delete an asset."""
asset.delete()
@router.post("/scan", response_model=dict)
def scan_media_folder():
"""
Scan the S3 media-in bucket for new video/audio files and register them as assets.
"""
from mpr.media_assets.models import MediaAsset
# List objects from S3 bucket
objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS)
# Get existing filenames to avoid duplicates
existing_filenames = set(MediaAsset.objects.values_list("filename", flat=True))
registered_files = []
skipped_files = []
for obj in objects:
if obj["filename"] in existing_filenames:
skipped_files.append(obj["filename"])
continue
try:
MediaAsset.objects.create(
filename=obj["filename"],
file_path=obj["key"],
file_size=obj["size"],
)
registered_files.append(obj["filename"])
except Exception as e:
print(f"Error registering {obj['filename']}: {e}")
return {
"found": len(objects),
"registered": len(registered_files),
"skipped": len(skipped_files),
"files": registered_files,
}

View File

@@ -1,233 +0,0 @@
"""
Job endpoints - transcode/trim job management.
"""
import os
from pathlib import Path
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from api.deps import get_asset, get_job, get_preset
from api.schema import JobCreate, JobResponse
router = APIRouter(prefix="/jobs", tags=["jobs"])
CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "")
@router.post("/", response_model=JobResponse, status_code=201)
def create_job(data: JobCreate):
"""
Create a transcode or trim job.
- With preset_id: Full transcode using preset settings
- Without preset_id but with trim_start/end: Trim only (stream copy)
"""
from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset
# Get source asset
try:
source = MediaAsset.objects.get(id=data.source_asset_id)
except MediaAsset.DoesNotExist:
raise HTTPException(status_code=404, detail="Source asset not found")
# Get preset if specified
preset = None
preset_snapshot = {}
if data.preset_id:
try:
preset = TranscodePreset.objects.get(id=data.preset_id)
preset_snapshot = {
"name": preset.name,
"container": preset.container,
"video_codec": preset.video_codec,
"video_bitrate": preset.video_bitrate,
"video_crf": preset.video_crf,
"video_preset": preset.video_preset,
"resolution": preset.resolution,
"framerate": preset.framerate,
"audio_codec": preset.audio_codec,
"audio_bitrate": preset.audio_bitrate,
"audio_channels": preset.audio_channels,
"audio_samplerate": preset.audio_samplerate,
"extra_args": preset.extra_args,
}
except TranscodePreset.DoesNotExist:
raise HTTPException(status_code=404, detail="Preset not found")
# Validate trim-only job
if not preset and not data.trim_start and not data.trim_end:
raise HTTPException(
status_code=400, detail="Must specify preset_id or trim_start/trim_end"
)
# Generate output filename - stored as S3 key in output bucket
output_filename = data.output_filename
if not output_filename:
stem = Path(source.filename).stem
ext = preset_snapshot.get("container", "mp4") if preset else "mp4"
output_filename = f"{stem}_output.{ext}"
# Create job
job = TranscodeJob.objects.create(
source_asset_id=source.id,
preset_id=preset.id if preset else None,
preset_snapshot=preset_snapshot,
trim_start=data.trim_start,
trim_end=data.trim_end,
output_filename=output_filename,
output_path=output_filename, # S3 key in output bucket
priority=data.priority or 0,
)
# Dispatch based on executor mode
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
if executor_mode == "lambda":
_dispatch_lambda(job, source, preset_snapshot)
else:
_dispatch_celery(job, source, preset_snapshot)
return job
def _dispatch_celery(job, source, preset_snapshot):
"""Dispatch job to Celery worker."""
from task.tasks import run_transcode_job
result = run_transcode_job.delay(
job_id=str(job.id),
source_key=source.file_path,
output_key=job.output_filename,
preset=preset_snapshot or None,
trim_start=job.trim_start,
trim_end=job.trim_end,
duration=source.duration,
)
job.celery_task_id = result.id
job.save(update_fields=["celery_task_id"])
def _dispatch_lambda(job, source, preset_snapshot):
"""Dispatch job to AWS Step Functions."""
from task.executor import get_executor
executor = get_executor()
executor.run(
job_id=str(job.id),
source_path=source.file_path,
output_path=job.output_filename,
preset=preset_snapshot or None,
trim_start=job.trim_start,
trim_end=job.trim_end,
duration=source.duration,
)
@router.post("/{job_id}/callback")
def job_callback(
job_id: UUID,
payload: dict,
x_api_key: Optional[str] = Header(None),
):
"""
Callback endpoint for Lambda to report job completion.
Protected by API key.
"""
if CALLBACK_API_KEY and x_api_key != CALLBACK_API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
from django.utils import timezone
from mpr.media_assets.models import TranscodeJob
try:
job = TranscodeJob.objects.get(id=job_id)
except TranscodeJob.DoesNotExist:
raise HTTPException(status_code=404, detail="Job not found")
status = payload.get("status", "failed")
job.status = status
job.progress = 100.0 if status == "completed" else job.progress
update_fields = ["status", "progress"]
if payload.get("error"):
job.error_message = payload["error"]
update_fields.append("error_message")
if status == "completed":
job.completed_at = timezone.now()
update_fields.append("completed_at")
elif status == "failed":
job.completed_at = timezone.now()
update_fields.append("completed_at")
job.save(update_fields=update_fields)
return {"ok": True}
@router.get("/", response_model=list[JobResponse])
def list_jobs(
status: Optional[str] = Query(None, description="Filter by status"),
source_asset_id: Optional[UUID] = Query(None),
limit: int = Query(50, ge=1, le=100),
offset: int = Query(0, ge=0),
):
"""List jobs with optional filtering."""
from mpr.media_assets.models import TranscodeJob
qs = TranscodeJob.objects.all()
if status:
qs = qs.filter(status=status)
if source_asset_id:
qs = qs.filter(source_asset_id=source_asset_id)
return list(qs[offset : offset + limit])
@router.get("/{job_id}", response_model=JobResponse)
def get_job_detail(job_id: UUID, job=Depends(get_job)):
"""Get job details including progress."""
return job
@router.get("/{job_id}/progress")
def get_job_progress(job_id: UUID, job=Depends(get_job)):
"""Get real-time job progress."""
return {
"job_id": str(job.id),
"status": job.status,
"progress": job.progress,
"current_frame": job.current_frame,
"current_time": job.current_time,
"speed": job.speed,
}
@router.post("/{job_id}/cancel", response_model=JobResponse)
def cancel_job(job_id: UUID, job=Depends(get_job)):
"""Cancel a pending or processing job."""
if job.status not in ("pending", "processing"):
raise HTTPException(
status_code=400, detail=f"Cannot cancel job with status: {job.status}"
)
job.status = "cancelled"
job.save(update_fields=["status"])
return job
@router.post("/{job_id}/retry", response_model=JobResponse)
def retry_job(job_id: UUID, job=Depends(get_job)):
"""Retry a failed job."""
if job.status != "failed":
raise HTTPException(status_code=400, detail="Only failed jobs can be retried")
job.status = "pending"
job.progress = 0
job.error_message = None
job.save(update_fields=["status", "progress", "error_message"])
return job

View File

@@ -1,100 +0,0 @@
"""
Preset endpoints - transcode configuration templates.
"""
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from api.deps import get_preset
from api.schema import PresetCreate, PresetResponse, PresetUpdate
router = APIRouter(prefix="/presets", tags=["presets"])
@router.post("/", response_model=PresetResponse, status_code=201)
def create_preset(data: PresetCreate):
"""Create a custom preset."""
from mpr.media_assets.models import TranscodePreset
preset = TranscodePreset.objects.create(
name=data.name,
description=data.description or "",
container=data.container or "mp4",
video_codec=data.video_codec or "libx264",
video_bitrate=data.video_bitrate,
video_crf=data.video_crf,
video_preset=data.video_preset,
resolution=data.resolution,
framerate=data.framerate,
audio_codec=data.audio_codec or "aac",
audio_bitrate=data.audio_bitrate,
audio_channels=data.audio_channels,
audio_samplerate=data.audio_samplerate,
extra_args=data.extra_args or [],
is_builtin=False,
)
return preset
@router.get("/", response_model=list[PresetResponse])
def list_presets(include_builtin: bool = True):
"""List all presets."""
from mpr.media_assets.models import TranscodePreset
qs = TranscodePreset.objects.all()
if not include_builtin:
qs = qs.filter(is_builtin=False)
return list(qs)
@router.get("/{preset_id}", response_model=PresetResponse)
def get_preset_detail(preset_id: UUID, preset=Depends(get_preset)):
"""Get preset details."""
return preset
@router.patch("/{preset_id}", response_model=PresetResponse)
def update_preset(preset_id: UUID, data: PresetUpdate, preset=Depends(get_preset)):
"""Update a custom preset. Builtin presets cannot be modified."""
if preset.is_builtin:
raise HTTPException(status_code=403, detail="Cannot modify builtin preset")
update_fields = []
for field in [
"name",
"description",
"container",
"video_codec",
"video_bitrate",
"video_crf",
"video_preset",
"resolution",
"framerate",
"audio_codec",
"audio_bitrate",
"audio_channels",
"audio_samplerate",
"extra_args",
]:
value = getattr(data, field, None)
if value is not None:
setattr(preset, field, value)
update_fields.append(field)
if update_fields:
preset.save(update_fields=update_fields)
return preset
@router.delete("/{preset_id}", status_code=204)
def delete_preset(preset_id: UUID, preset=Depends(get_preset)):
"""Delete a custom preset. Builtin presets cannot be deleted."""
if preset.is_builtin:
raise HTTPException(status_code=403, detail="Cannot delete builtin preset")
preset.delete()

View File

@@ -1,51 +0,0 @@
"""
System endpoints - health checks and FFmpeg capabilities.
"""
from fastapi import APIRouter
from core.ffmpeg import get_decoders, get_encoders, get_formats
router = APIRouter(prefix="/system", tags=["system"])
@router.get("/health")
def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
@router.get("/status")
def system_status():
"""System status for UI."""
return {"status": "ok", "version": "0.1.0"}
@router.get("/worker")
def worker_status():
"""Worker status from gRPC."""
try:
from rpc.client import get_client
client = get_client()
status = client.get_worker_status()
if status:
return status
return {"available": False, "error": "No response from worker"}
except Exception as e:
return {"available": False, "error": str(e)}
@router.get("/ffmpeg/codecs")
def ffmpeg_codecs():
"""Get available FFmpeg encoders and decoders."""
return {
"encoders": get_encoders(),
"decoders": get_decoders(),
}
@router.get("/ffmpeg/formats")
def ffmpeg_formats():
"""Get available FFmpeg muxers and demuxers."""
return get_formats()

View File

@@ -1,10 +0,0 @@
"""API Schemas - GENERATED FILE"""
from .base import BaseSchema
from .asset import AssetCreate, AssetUpdate, AssetResponse
from .asset import AssetStatus
from .preset import PresetCreate, PresetUpdate, PresetResponse
from .job import JobCreate, JobUpdate, JobResponse
from .job import JobStatus
__all__ = ["BaseSchema", "AssetCreate", "AssetUpdate", "AssetResponse", "AssetStatus", "PresetCreate", "PresetUpdate", "PresetResponse", "JobCreate", "JobUpdate", "JobResponse", "JobStatus"]

View File

@@ -1,70 +0,0 @@
"""MediaAsset Schemas - GENERATED FILE"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from uuid import UUID
from .base import BaseSchema
class AssetStatus(str, Enum):
PENDING = "pending"
READY = "ready"
ERROR = "error"
class AssetCreate(BaseSchema):
"""AssetCreate schema."""
filename: str
file_path: str
file_size: Optional[int] = None
duration: Optional[float] = None
video_codec: Optional[str] = None
audio_codec: Optional[str] = None
width: Optional[int] = None
height: Optional[int] = None
framerate: Optional[float] = None
bitrate: Optional[int] = None
properties: Dict[str, Any]
comments: str = ""
tags: List[str] = Field(default_factory=list)
class AssetUpdate(BaseSchema):
"""AssetUpdate schema."""
filename: Optional[str] = None
file_path: Optional[str] = None
status: Optional[AssetStatus] = None
error_message: Optional[str] = None
file_size: Optional[int] = None
duration: Optional[float] = None
video_codec: Optional[str] = None
audio_codec: Optional[str] = None
width: Optional[int] = None
height: Optional[int] = None
framerate: Optional[float] = None
bitrate: Optional[int] = None
properties: Optional[Dict[str, Any]] = None
comments: Optional[str] = None
tags: Optional[List[str]] = None
class AssetResponse(BaseSchema):
"""AssetResponse schema."""
id: UUID
filename: str
file_path: str
status: AssetStatus = "AssetStatus.PENDING"
error_message: Optional[str] = None
file_size: Optional[int] = None
duration: Optional[float] = None
video_codec: Optional[str] = None
audio_codec: Optional[str] = None
width: Optional[int] = None
height: Optional[int] = None
framerate: Optional[float] = None
bitrate: Optional[int] = None
properties: Dict[str, Any]
comments: str = ""
tags: List[str] = Field(default_factory=list)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None

View File

@@ -1,8 +0,0 @@
"""Pydantic Base Schema - GENERATED FILE"""
from pydantic import BaseModel, ConfigDict
class BaseSchema(BaseModel):
"""Base schema with ORM mode."""
model_config = ConfigDict(from_attributes=True)

View File

@@ -104,6 +104,13 @@ class CreateJobInput(graphene.InputObjectType):
priority = graphene.Int(default_value=0)
class UpdateAssetInput(graphene.InputObjectType):
"""Request body for updating asset metadata."""
comments = graphene.String()
tags = graphene.List(graphene.String)
class SystemStatusType(graphene.ObjectType):
"""System status response."""
@@ -120,6 +127,12 @@ class ScanResultType(graphene.ObjectType):
files = graphene.List(graphene.String)
class DeleteResultType(graphene.ObjectType):
"""Result of a delete operation."""
ok = graphene.Boolean()
class WorkerStatusType(graphene.ObjectType):
"""Worker health and capabilities."""

View File

@@ -1,83 +0,0 @@
"""TranscodeJob Schemas - GENERATED FILE"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from uuid import UUID
from .base import BaseSchema
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class JobCreate(BaseSchema):
"""JobCreate schema."""
source_asset_id: UUID
preset_id: Optional[UUID] = None
preset_snapshot: Dict[str, Any]
trim_start: Optional[float] = None
trim_end: Optional[float] = None
output_filename: str = ""
output_path: Optional[str] = None
output_asset_id: Optional[UUID] = None
progress: float = 0.0
current_frame: Optional[int] = None
current_time: Optional[float] = None
speed: Optional[str] = None
celery_task_id: Optional[str] = None
execution_arn: Optional[str] = None
priority: int = 0
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class JobUpdate(BaseSchema):
"""JobUpdate schema."""
source_asset_id: Optional[UUID] = None
preset_id: Optional[UUID] = None
preset_snapshot: Optional[Dict[str, Any]] = None
trim_start: Optional[float] = None
trim_end: Optional[float] = None
output_filename: Optional[str] = None
output_path: Optional[str] = None
output_asset_id: Optional[UUID] = None
status: Optional[JobStatus] = None
progress: Optional[float] = None
current_frame: Optional[int] = None
current_time: Optional[float] = None
speed: Optional[str] = None
error_message: Optional[str] = None
celery_task_id: Optional[str] = None
execution_arn: Optional[str] = None
priority: Optional[int] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class JobResponse(BaseSchema):
"""JobResponse schema."""
id: UUID
source_asset_id: UUID
preset_id: Optional[UUID] = None
preset_snapshot: Dict[str, Any]
trim_start: Optional[float] = None
trim_end: Optional[float] = None
output_filename: str = ""
output_path: Optional[str] = None
output_asset_id: Optional[UUID] = None
status: JobStatus = "JobStatus.PENDING"
progress: float = 0.0
current_frame: Optional[int] = None
current_time: Optional[float] = None
speed: Optional[str] = None
error_message: Optional[str] = None
celery_task_id: Optional[str] = None
execution_arn: Optional[str] = None
priority: int = 0
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

View File

@@ -1,66 +0,0 @@
"""TranscodePreset Schemas - GENERATED FILE"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from uuid import UUID
from .base import BaseSchema
class PresetCreate(BaseSchema):
"""PresetCreate schema."""
name: str
description: str = ""
is_builtin: bool = False
container: str = "mp4"
video_codec: str = "libx264"
video_bitrate: Optional[str] = None
video_crf: Optional[int] = None
video_preset: Optional[str] = None
resolution: Optional[str] = None
framerate: Optional[float] = None
audio_codec: str = "aac"
audio_bitrate: Optional[str] = None
audio_channels: Optional[int] = None
audio_samplerate: Optional[int] = None
extra_args: List[str] = Field(default_factory=list)
class PresetUpdate(BaseSchema):
"""PresetUpdate schema."""
name: Optional[str] = None
description: Optional[str] = None
is_builtin: Optional[bool] = None
container: Optional[str] = None
video_codec: Optional[str] = None
video_bitrate: Optional[str] = None
video_crf: Optional[int] = None
video_preset: Optional[str] = None
resolution: Optional[str] = None
framerate: Optional[float] = None
audio_codec: Optional[str] = None
audio_bitrate: Optional[str] = None
audio_channels: Optional[int] = None
audio_samplerate: Optional[int] = None
extra_args: Optional[List[str]] = None
class PresetResponse(BaseSchema):
"""PresetResponse schema."""
id: UUID
name: str
description: str = ""
is_builtin: bool = False
container: str = "mp4"
video_codec: str = "libx264"
video_bitrate: Optional[str] = None
video_crf: Optional[int] = None
video_preset: Optional[str] = None
resolution: Optional[str] = None
framerate: Optional[float] = None
audio_codec: str = "aac"
audio_bitrate: Optional[str] = None
audio_channels: Optional[int] = None
audio_samplerate: Optional[int] = None
extra_args: List[str] = Field(default_factory=list)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None