179 lines
5.4 KiB
Python
179 lines
5.4 KiB
Python
"""
|
|
Job endpoints - transcode/trim job management.
|
|
"""
|
|
|
|
import json
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
|
|
from api.deps import get_asset, get_job, get_preset
|
|
from api.schemas import JobCreate, JobResponse
|
|
|
|
router = APIRouter(prefix="/jobs", tags=["jobs"])
|
|
|
|
|
|
@router.post("/", response_model=JobResponse, status_code=201)
|
|
def create_job(data: JobCreate):
|
|
"""
|
|
Create a transcode or trim job.
|
|
|
|
- With preset_id: Full transcode using preset settings
|
|
- Without preset_id but with trim_start/end: Trim only (stream copy)
|
|
"""
|
|
from mpr.media_assets.models import MediaAsset, TranscodeJob, TranscodePreset
|
|
|
|
# Get source asset
|
|
try:
|
|
source = MediaAsset.objects.get(id=data.source_asset_id)
|
|
except MediaAsset.DoesNotExist:
|
|
raise HTTPException(status_code=404, detail="Source asset not found")
|
|
|
|
# Get preset if specified
|
|
preset = None
|
|
preset_snapshot = {}
|
|
if data.preset_id:
|
|
try:
|
|
preset = TranscodePreset.objects.get(id=data.preset_id)
|
|
# Snapshot preset at job creation time
|
|
preset_snapshot = {
|
|
"name": preset.name,
|
|
"container": preset.container,
|
|
"video_codec": preset.video_codec,
|
|
"video_bitrate": preset.video_bitrate,
|
|
"video_crf": preset.video_crf,
|
|
"video_preset": preset.video_preset,
|
|
"resolution": preset.resolution,
|
|
"framerate": preset.framerate,
|
|
"audio_codec": preset.audio_codec,
|
|
"audio_bitrate": preset.audio_bitrate,
|
|
"audio_channels": preset.audio_channels,
|
|
"audio_samplerate": preset.audio_samplerate,
|
|
"extra_args": preset.extra_args,
|
|
}
|
|
except TranscodePreset.DoesNotExist:
|
|
raise HTTPException(status_code=404, detail="Preset not found")
|
|
|
|
# Validate trim-only job
|
|
if not preset and not data.trim_start and not data.trim_end:
|
|
raise HTTPException(
|
|
status_code=400, detail="Must specify preset_id or trim_start/trim_end"
|
|
)
|
|
|
|
# Generate output filename and path
|
|
import os
|
|
from pathlib import Path
|
|
|
|
output_filename = data.output_filename
|
|
if not output_filename:
|
|
stem = Path(source.filename).stem
|
|
ext = preset_snapshot.get("container", "mp4") if preset else "mp4"
|
|
output_filename = f"{stem}_output.{ext}"
|
|
|
|
media_out = os.environ.get("MEDIA_OUT", "/app/media/out")
|
|
output_path = str(Path(media_out) / output_filename)
|
|
|
|
media_in = os.environ.get("MEDIA_IN", "/app/media/in")
|
|
source_path = str(Path(media_in) / source.file_path)
|
|
|
|
# Create job
|
|
job = TranscodeJob.objects.create(
|
|
source_asset_id=source.id,
|
|
preset_id=preset.id if preset else None,
|
|
preset_snapshot=preset_snapshot,
|
|
trim_start=data.trim_start,
|
|
trim_end=data.trim_end,
|
|
output_filename=output_filename,
|
|
output_path=output_path,
|
|
priority=data.priority or 0,
|
|
)
|
|
|
|
# Dispatch to Celery
|
|
from task.tasks import run_transcode_job
|
|
|
|
result = run_transcode_job.delay(
|
|
job_id=str(job.id),
|
|
source_path=source_path,
|
|
output_path=output_path,
|
|
preset=preset_snapshot or None,
|
|
trim_start=data.trim_start,
|
|
trim_end=data.trim_end,
|
|
duration=source.duration,
|
|
)
|
|
job.celery_task_id = result.id
|
|
job.save(update_fields=["celery_task_id"])
|
|
|
|
return job
|
|
|
|
|
|
@router.get("/", response_model=list[JobResponse])
|
|
def list_jobs(
|
|
status: Optional[str] = Query(None, description="Filter by status"),
|
|
source_asset_id: Optional[UUID] = Query(None),
|
|
limit: int = Query(50, ge=1, le=100),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
"""List jobs with optional filtering."""
|
|
from mpr.media_assets.models import TranscodeJob
|
|
|
|
qs = TranscodeJob.objects.all()
|
|
|
|
if status:
|
|
qs = qs.filter(status=status)
|
|
if source_asset_id:
|
|
qs = qs.filter(source_asset_id=source_asset_id)
|
|
|
|
return list(qs[offset : offset + limit])
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobResponse)
|
|
def get_job_detail(job_id: UUID, job=Depends(get_job)):
|
|
"""Get job details including progress."""
|
|
return job
|
|
|
|
|
|
@router.get("/{job_id}/progress")
|
|
def get_job_progress(job_id: UUID, job=Depends(get_job)):
|
|
"""Get real-time job progress."""
|
|
return {
|
|
"job_id": str(job.id),
|
|
"status": job.status,
|
|
"progress": job.progress,
|
|
"current_frame": job.current_frame,
|
|
"current_time": job.current_time,
|
|
"speed": job.speed,
|
|
}
|
|
|
|
|
|
@router.post("/{job_id}/cancel", response_model=JobResponse)
|
|
def cancel_job(job_id: UUID, job=Depends(get_job)):
|
|
"""Cancel a pending or processing job."""
|
|
if job.status not in ("pending", "processing"):
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Cannot cancel job with status: {job.status}"
|
|
)
|
|
|
|
# TODO: Cancel via gRPC
|
|
|
|
job.status = "cancelled"
|
|
job.save(update_fields=["status"])
|
|
|
|
return job
|
|
|
|
|
|
@router.post("/{job_id}/retry", response_model=JobResponse)
|
|
def retry_job(job_id: UUID, job=Depends(get_job)):
|
|
"""Retry a failed job."""
|
|
if job.status != "failed":
|
|
raise HTTPException(status_code=400, detail="Only failed jobs can be retried")
|
|
|
|
job.status = "pending"
|
|
job.progress = 0
|
|
job.error_message = None
|
|
job.save(update_fields=["status", "progress", "error_message"])
|
|
|
|
# TODO: Resubmit via gRPC
|
|
|
|
return job
|