""" Google Cloud Run Job handler for media transcoding. Reads job payload from the MPR_JOB_PAYLOAD env var (injected by GCPExecutor), downloads source from S3-compatible storage (GCS via HMAC + S3 API), runs FFmpeg, uploads result, and calls back to the API. Uses core/storage and core/ffmpeg — same modules as the Celery worker. No cloud-provider SDK required here; storage goes through core.storage (boto3 + S3 compat). Entry point: python -m task.gcp_handler (set as Cloud Run Job command) """ import json import logging import os import sys import tempfile from pathlib import Path import requests from core.ffmpeg.transcode import TranscodeConfig, transcode from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def main() -> None: raw = os.environ.get("MPR_JOB_PAYLOAD") if not raw: logger.error("MPR_JOB_PAYLOAD not set") sys.exit(1) event = json.loads(raw) job_id = event["job_id"] source_key = event["source_key"] output_key = event["output_key"] preset = event.get("preset") trim_start = event.get("trim_start") trim_end = event.get("trim_end") duration = event.get("duration") callback_url = event.get("callback_url", "") api_key = event.get("api_key", "") logger.info(f"Starting job {job_id}: {source_key} -> {output_key}") tmp_source = download_to_temp(BUCKET_IN, source_key) ext_out = Path(output_key).suffix or ".mp4" fd, tmp_output = tempfile.mkstemp(suffix=ext_out) os.close(fd) try: if preset: config = TranscodeConfig( input_path=tmp_source, output_path=tmp_output, video_codec=preset.get("video_codec", "libx264"), video_bitrate=preset.get("video_bitrate"), video_crf=preset.get("video_crf"), video_preset=preset.get("video_preset"), resolution=preset.get("resolution"), framerate=preset.get("framerate"), audio_codec=preset.get("audio_codec", "aac"), audio_bitrate=preset.get("audio_bitrate"), audio_channels=preset.get("audio_channels"), audio_samplerate=preset.get("audio_samplerate"), container=preset.get("container", "mp4"), extra_args=preset.get("extra_args", []), trim_start=trim_start, trim_end=trim_end, ) else: config = TranscodeConfig( input_path=tmp_source, output_path=tmp_output, video_codec="copy", audio_codec="copy", trim_start=trim_start, trim_end=trim_end, ) success = transcode(config, duration=duration) if not success: raise RuntimeError("Transcode returned False") logger.info(f"Uploading to {BUCKET_OUT}/{output_key}") upload_file(tmp_output, BUCKET_OUT, output_key) _callback(callback_url, job_id, api_key, {"status": "completed"}) logger.info(f"Job {job_id} completed") sys.exit(0) except Exception as e: logger.exception(f"Job {job_id} failed: {e}") _callback(callback_url, job_id, api_key, {"status": "failed", "error": str(e)}) sys.exit(1) finally: for f in [tmp_source, tmp_output]: try: os.unlink(f) except OSError: pass def _callback(callback_url: str, job_id: str, api_key: str, payload: dict) -> None: if not callback_url: return try: url = f"{callback_url}/jobs/{job_id}/callback" headers = {"X-API-Key": api_key} if api_key else {} resp = requests.post(url, json=payload, headers=headers, timeout=10) logger.info(f"Callback response: {resp.status_code}") except Exception as e: logger.warning(f"Callback failed: {e}") if __name__ == "__main__": main()