""" GraphQL API using graphene, mounted on FastAPI/Starlette. Provides the same data as the REST API but via GraphQL queries and mutations. Uses Django ORM directly for data access. Types are generated from schema/ via modelgen — see api/schemas/graphql_types.py. """ import os import graphene from api.schemas.graphql import ( CreateJobInput, MediaAssetType, ScanResultType, SystemStatusType, TranscodeJobType, TranscodePresetType, ) from core.storage import BUCKET_IN, list_objects # Media extensions (same as assets route) VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} MEDIA_EXTS = VIDEO_EXTS | AUDIO_EXTS # --------------------------------------------------------------------------- # Queries # --------------------------------------------------------------------------- class Query(graphene.ObjectType): assets = graphene.List( MediaAssetType, status=graphene.String(), search=graphene.String(), ) asset = graphene.Field(MediaAssetType, id=graphene.UUID(required=True)) jobs = graphene.List( TranscodeJobType, status=graphene.String(), source_asset_id=graphene.UUID(), ) job = graphene.Field(TranscodeJobType, id=graphene.UUID(required=True)) presets = graphene.List(TranscodePresetType) system_status = graphene.Field(SystemStatusType) def resolve_assets(self, info, status=None, search=None): from mpr.media_assets.models import MediaAsset qs = MediaAsset.objects.all() if status: qs = qs.filter(status=status) if search: qs = qs.filter(filename__icontains=search) return qs def resolve_asset(self, info, id): from mpr.media_assets.models import MediaAsset try: return MediaAsset.objects.get(id=id) except MediaAsset.DoesNotExist: return None def resolve_jobs(self, info, status=None, source_asset_id=None): from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() if status: qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) return qs def resolve_job(self, info, id): from mpr.media_assets.models import TranscodeJob try: return TranscodeJob.objects.get(id=id) except TranscodeJob.DoesNotExist: return None def resolve_presets(self, info): from mpr.media_assets.models import TranscodePreset return TranscodePreset.objects.all() def resolve_system_status(self, info): return {"status": "ok", "version": "0.1.0"} # --------------------------------------------------------------------------- # Mutations # --------------------------------------------------------------------------- class ScanMediaFolder(graphene.Mutation): class Arguments: pass Output = ScanResultType def mutate(self, info): from mpr.media_assets.models import MediaAsset objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) existing = set(MediaAsset.objects.values_list("filename", flat=True)) registered = [] skipped = [] for obj in objects: if obj["filename"] in existing: skipped.append(obj["filename"]) continue try: MediaAsset.objects.create( filename=obj["filename"], file_path=obj["key"], file_size=obj["size"], ) registered.append(obj["filename"]) except Exception: pass return ScanResultType( found=len(objects), registered=len(registered), skipped=len(skipped), files=registered, ) class CreateJob(graphene.Mutation): class Arguments: input = CreateJobInput(required=True) Output = TranscodeJobType def mutate(self, info, input): from pathlib import Path from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset try: source = MediaAsset.objects.get(id=input.source_asset_id) except MediaAsset.DoesNotExist: raise Exception("Source asset not found") preset = None preset_snapshot = {} if input.preset_id: try: preset = TranscodePreset.objects.get(id=input.preset_id) preset_snapshot = { "name": preset.name, "container": preset.container, "video_codec": preset.video_codec, "audio_codec": preset.audio_codec, } except TranscodePreset.DoesNotExist: raise Exception("Preset not found") if not preset and not input.trim_start and not input.trim_end: raise Exception("Must specify preset_id or trim_start/trim_end") output_filename = input.output_filename if not output_filename: stem = Path(source.filename).stem ext = preset_snapshot.get("container", "mp4") if preset else "mp4" output_filename = f"{stem}_output.{ext}" job = TranscodeJob.objects.create( source_asset_id=source.id, preset_id=preset.id if preset else None, preset_snapshot=preset_snapshot, trim_start=input.trim_start, trim_end=input.trim_end, output_filename=output_filename, output_path=output_filename, priority=input.priority or 0, ) # Dispatch executor_mode = os.environ.get("MPR_EXECUTOR", "local") if executor_mode == "lambda": from task.executor import get_executor get_executor().run( job_id=str(job.id), source_path=source.file_path, output_path=output_filename, preset=preset_snapshot or None, trim_start=input.trim_start, trim_end=input.trim_end, duration=source.duration, ) else: from task.tasks import run_transcode_job result = run_transcode_job.delay( job_id=str(job.id), source_key=source.file_path, output_key=output_filename, preset=preset_snapshot or None, trim_start=input.trim_start, trim_end=input.trim_end, duration=source.duration, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) return job class CancelJob(graphene.Mutation): class Arguments: id = graphene.UUID(required=True) Output = TranscodeJobType def mutate(self, info, id): from mpr.media_assets.models import TranscodeJob try: job = TranscodeJob.objects.get(id=id) except TranscodeJob.DoesNotExist: raise Exception("Job not found") if job.status not in ("pending", "processing"): raise Exception(f"Cannot cancel job with status: {job.status}") job.status = "cancelled" job.save(update_fields=["status"]) return job class Mutation(graphene.ObjectType): scan_media_folder = ScanMediaFolder.Field() create_job = CreateJob.Field() cancel_job = CancelJob.Field() # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- schema = graphene.Schema(query=Query, mutation=Mutation)