""" Job endpoints - transcode/trim job management. """ import json from typing import Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query from api.deps import get_asset, get_job, get_preset from api.schemas import JobCreate, JobResponse router = APIRouter(prefix="/jobs", tags=["jobs"]) @router.post("/", response_model=JobResponse, status_code=201) def create_job(data: JobCreate): """ Create a transcode or trim job. - With preset_id: Full transcode using preset settings - Without preset_id but with trim_start/end: Trim only (stream copy) """ from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset # Get source asset try: source = MediaAsset.objects.get(id=data.source_asset_id) except MediaAsset.DoesNotExist: raise HTTPException(status_code=404, detail="Source asset not found") # Get preset if specified preset = None preset_snapshot = {} if data.preset_id: try: preset = TranscodePreset.objects.get(id=data.preset_id) # Snapshot preset at job creation time preset_snapshot = { "name": preset.name, "container": preset.container, "video_codec": preset.video_codec, "video_bitrate": preset.video_bitrate, "video_crf": preset.video_crf, "video_preset": preset.video_preset, "resolution": preset.resolution, "framerate": preset.framerate, "audio_codec": preset.audio_codec, "audio_bitrate": preset.audio_bitrate, "audio_channels": preset.audio_channels, "audio_samplerate": preset.audio_samplerate, "extra_args": preset.extra_args, } except TranscodePreset.DoesNotExist: raise HTTPException(status_code=404, detail="Preset not found") # Validate trim-only job if not preset and not data.trim_start and not data.trim_end: raise HTTPException( status_code=400, detail="Must specify preset_id or trim_start/trim_end" ) # Generate output filename and path import os from pathlib import Path output_filename = data.output_filename if not output_filename: stem = Path(source.filename).stem ext = preset_snapshot.get("container", "mp4") if preset else "mp4" output_filename = f"{stem}_output.{ext}" media_out = os.environ.get("MEDIA_OUT", "/app/media/out") output_path = str(Path(media_out) / output_filename) media_in = os.environ.get("MEDIA_IN", "/app/media/in") source_path = str(Path(media_in) / source.file_path) # Create job job = TranscodeJob.objects.create( source_asset_id=source.id, preset_id=preset.id if preset else None, preset_snapshot=preset_snapshot, trim_start=data.trim_start, trim_end=data.trim_end, output_filename=output_filename, output_path=output_path, priority=data.priority or 0, ) # Dispatch to Celery from task.tasks import run_transcode_job result = run_transcode_job.delay( job_id=str(job.id), source_path=source_path, output_path=output_path, preset=preset_snapshot or None, trim_start=data.trim_start, trim_end=data.trim_end, duration=source.duration, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) return job @router.get("/", response_model=list[JobResponse]) def list_jobs( status: Optional[str] = Query(None, description="Filter by status"), source_asset_id: Optional[UUID] = Query(None), limit: int = Query(50, ge=1, le=100), offset: int = Query(0, ge=0), ): """List jobs with optional filtering.""" from mpr.media_assets.models import TranscodeJob qs = TranscodeJob.objects.all() if status: qs = qs.filter(status=status) if source_asset_id: qs = qs.filter(source_asset_id=source_asset_id) return list(qs[offset : offset + limit]) @router.get("/{job_id}", response_model=JobResponse) def get_job_detail(job_id: UUID, job=Depends(get_job)): """Get job details including progress.""" return job @router.get("/{job_id}/progress") def get_job_progress(job_id: UUID, job=Depends(get_job)): """Get real-time job progress.""" return { "job_id": str(job.id), "status": job.status, "progress": job.progress, "current_frame": job.current_frame, "current_time": job.current_time, "speed": job.speed, } @router.post("/{job_id}/cancel", response_model=JobResponse) def cancel_job(job_id: UUID, job=Depends(get_job)): """Cancel a pending or processing job.""" if job.status not in ("pending", "processing"): raise HTTPException( status_code=400, detail=f"Cannot cancel job with status: {job.status}" ) # TODO: Cancel via gRPC job.status = "cancelled" job.save(update_fields=["status"]) return job @router.post("/{job_id}/retry", response_model=JobResponse) def retry_job(job_id: UUID, job=Depends(get_job)): """Retry a failed job.""" if job.status != "failed": raise HTTPException(status_code=400, detail="Only failed jobs can be retried") job.status = "pending" job.progress = 0 job.error_message = None job.save(update_fields=["status", "progress", "error_message"]) # TODO: Resubmit via gRPC return job