Files
mediaproc/worker/tasks.py

97 lines
2.7 KiB
Python

"""
Celery tasks for job processing.
"""
import logging
import os
from typing import Any, Dict, Optional
from celery import shared_task
from grpc.server import update_job_progress
from worker.executor import get_executor
logger = logging.getLogger(__name__)
# Media paths from environment
MEDIA_ROOT = os.environ.get("MEDIA_ROOT", "/app/media")
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def run_transcode_job(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
) -> Dict[str, Any]:
"""
Celery task to run a transcode/trim job.
Args:
job_id: Unique job identifier
source_path: Path to source file
output_path: Path for output file
preset: Transcode preset dict (optional)
trim_start: Trim start time in seconds (optional)
trim_end: Trim end time in seconds (optional)
duration: Source duration for progress calculation
Returns:
Result dict with status and output_path
"""
logger.info(f"Starting job {job_id}: {source_path} -> {output_path}")
# Update status to processing
update_job_progress(job_id, progress=0, status="processing")
def progress_callback(percent: int, details: Dict[str, Any]) -> None:
"""Update gRPC progress state."""
update_job_progress(
job_id,
progress=percent,
current_time=details.get("time", 0.0),
status="processing",
)
try:
executor = get_executor()
success = executor.run(
job_id=job_id,
source_path=source_path,
output_path=output_path,
preset=preset,
trim_start=trim_start,
trim_end=trim_end,
duration=duration,
progress_callback=progress_callback,
)
if success:
logger.info(f"Job {job_id} completed successfully")
update_job_progress(job_id, progress=100, status="completed")
return {
"status": "completed",
"job_id": job_id,
"output_path": output_path,
}
else:
raise RuntimeError("Executor returned False")
except Exception as e:
logger.exception(f"Job {job_id} failed: {e}")
update_job_progress(job_id, progress=0, status="failed", error=str(e))
# Retry on transient errors
if self.request.retries < self.max_retries:
raise self.retry(exc=e)
return {
"status": "failed",
"job_id": job_id,
"error": str(e),
}