""" Job endpoints - transcode/trim job management. """ import os from pathlib import Path from typing import Optional from uuid import UUID from fastapi import APIRouter, Depends, Header, HTTPException, Query from api.deps import get_asset, get_job, get_preset from api.schema import JobCreate, JobResponse router = APIRouter(prefix="/jobs", tags=["jobs"]) CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") @router.post("/", response_model=JobResponse, status_code=201) def create_job(data: JobCreate): """ Create a transcode or trim job. - With preset_id: Full transcode using preset settings - Without preset_id but with trim_start/end: Trim only (stream copy) """ from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset # Get source asset try: source = MediaAsset.objects.get(id=data.source_asset_id) except MediaAsset.DoesNotExist: raise HTTPException(status_code=404, detail="Source asset not found") # Get preset if specified preset = None preset_snapshot = {} if data.preset_id: try: preset = TranscodePreset.objects.get(id=data.preset_id) preset_snapshot = { "name": preset.name, "container": preset.container, "video_codec": preset.video_codec, "video_bitrate": preset.video_bitrate, "video_crf": preset.video_crf, "video_preset": preset.video_preset, "resolution": preset.resolution, "framerate": preset.framerate, "audio_codec": preset.audio_codec, "audio_bitrate": preset.audio_bitrate, "audio_channels": preset.audio_channels, "audio_samplerate": preset.audio_samplerate, "extra_args": preset.extra_args, } except TranscodePreset.DoesNotExist: raise HTTPException(status_code=404, detail="Preset not found") # Validate trim-only job if not preset and not data.trim_start and not data.trim_end: raise HTTPException( status_code=400, detail="Must specify preset_id or trim_start/trim_end" ) # Generate output filename - stored as S3 key in output bucket output_filename = data.output_filename if not output_filename: stem = Path(source.filename).stem ext = preset_snapshot.get("container", "mp4") if preset else "mp4" output_filename = f"{stem}_output.{ext}" # Create job job = TranscodeJob.objects.create( source_asset_id=source.id, preset_id=preset.id if preset else None, preset_snapshot=preset_snapshot, trim_start=data.trim_start, trim_end=data.trim_end, output_filename=output_filename, output_path=output_filename, # S3 key in output bucket priority=data.priority or 0, ) # Dispatch based on executor mode executor_mode = os.environ.get("MPR_EXECUTOR", "local") if executor_mode == "lambda": _dispatch_lambda(job, source, preset_snapshot) else: _dispatch_celery(job, source, preset_snapshot) return job def _dispatch_celery(job, source, preset_snapshot): """Dispatch job to Celery worker.""" from task.tasks import run_transcode_job result = run_transcode_job.delay( job_id=str(job.id), source_key=source.file_path, output_key=job.output_filename, preset=preset_snapshot or None, trim_start=job.trim_start, trim_end=job.trim_end, duration=source.duration, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) def _dispatch_lambda(job, source, preset_snapshot): """Dispatch job to AWS Step Functions.""" from task.executor import get_executor executor = get_executor() executor.run( job_id=str(job.id), source_path=source.file_path, output_path=job.output_filename, preset=preset_snapshot or None, trim_start=job.trim_start, trim_end=job.trim_end, duration=source.duration, ) @router.post("/{job_id}/callback") def job_callback( job_id: UUID, payload: dict, x_api_key: Optional[str] = Header(None), ): """ Callback endpoint for Lambda to report job completion. Protected by API key. """ if CALLBACK_API_KEY and x_api_key != CALLBACK_API_KEY: raise HTTPException(status_code=403, detail="Invalid API key") from django.utils import timezone from mpr.media_assets.models import TranscodeJob try: job = TranscodeJob.objects.get(id=job_id) except TranscodeJob.DoesNotExist: raise HTTPException(status_code=404, detail="Job not found") status = payload.get("status", "failed") job.status = status job.progress = 100.0 if status == "completed" else job.progress update_fields = ["status", "progress"] if payload.get("error"): job.error_message = payload["error"] update_fields.append("error_message") if status == "completed": job.completed_at = timezone.now() update_fields.append("completed_at") elif status == "failed": job.completed_at = timezone.now() update_fields.append("completed_at") job.save(update_fields=update_fields) return {"ok": True} @router.get("/", response_model=list[JobResponse]) def list_jobs( status: Optional[str] = Query(None, description="Filter by status"), source_asset_id: Optional[UUID] = Query(None), limit: int = Query(50, ge=1, le=100), offset: int = Query(0, ge=0), ): """List jobs with optional filtering.""" from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() if status: qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) return list(qs[offset : offset + limit]) @router.get("/{job_id}", response_model=JobResponse) def get_job_detail(job_id: UUID, job=Depends(get_job)): """Get job details including progress.""" return job @router.get("/{job_id}/progress") def get_job_progress(job_id: UUID, job=Depends(get_job)): """Get real-time job progress.""" return { "job_id": str(job.id), "status": job.status, "progress": job.progress, "current_frame": job.current_frame, "current_time": job.current_time, "speed": job.speed, } @router.post("/{job_id}/cancel", response_model=JobResponse) def cancel_job(job_id: UUID, job=Depends(get_job)): """Cancel a pending or processing job.""" if job.status not in ("pending", "processing"): raise HTTPException( status_code=400, detail=f"Cannot cancel job with status: {job.status}" ) job.status = "cancelled" job.save(update_fields=["status"]) return job @router.post("/{job_id}/retry", response_model=JobResponse) def retry_job(job_id: UUID, job=Depends(get_job)): """Retry a failed job.""" if job.status != "failed": raise HTTPException(status_code=400, detail="Only failed jobs can be retried") job.status = "pending" job.progress = 0 job.error_message = None job.save(update_fields=["status", "progress", "error_message"]) return job