199 lines
5.5 KiB
Python
199 lines
5.5 KiB
Python
"""
|
|
Executor abstraction for job processing.
|
|
|
|
Determines WHERE jobs run:
|
|
- LocalExecutor: delegates to registered Handler (default)
|
|
- LambdaExecutor: AWS Step Functions
|
|
- GCPExecutor: Google Cloud Run Jobs
|
|
"""
|
|
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
# Configuration from environment
|
|
MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local")
|
|
|
|
|
|
class Executor(ABC):
|
|
"""Abstract base class for job executors."""
|
|
|
|
@abstractmethod
|
|
def run(
|
|
self,
|
|
job_type: str,
|
|
job_id: str,
|
|
payload: Dict[str, Any],
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""
|
|
Execute a job.
|
|
|
|
Args:
|
|
job_type: Type of job ("transcode", "chunk", etc.)
|
|
job_id: Unique job identifier
|
|
payload: Job-type-specific configuration dict
|
|
progress_callback: Called with (percent, details_dict)
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
pass
|
|
|
|
|
|
class LocalExecutor(Executor):
|
|
"""Execute jobs locally using registered handlers."""
|
|
|
|
def run(
|
|
self,
|
|
job_type: str,
|
|
job_id: str,
|
|
payload: Dict[str, Any],
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""Execute job using the appropriate local handler."""
|
|
from .registry import get_handler
|
|
|
|
handler = get_handler(job_type)
|
|
result = handler.process(
|
|
job_id=job_id,
|
|
payload=payload,
|
|
progress_callback=progress_callback,
|
|
)
|
|
return result.get("status") == "completed"
|
|
|
|
|
|
class LambdaExecutor(Executor):
|
|
"""Execute jobs via AWS Step Functions + Lambda."""
|
|
|
|
def __init__(self):
|
|
import boto3
|
|
|
|
region = os.environ.get("AWS_REGION", "us-east-1")
|
|
self.sfn = boto3.client("stepfunctions", region_name=region)
|
|
self.state_machine_arn = os.environ["STEP_FUNCTION_ARN"]
|
|
self.callback_url = os.environ.get("CALLBACK_URL", "")
|
|
self.callback_api_key = os.environ.get("CALLBACK_API_KEY", "")
|
|
|
|
def run(
|
|
self,
|
|
job_type: str,
|
|
job_id: str,
|
|
payload: Dict[str, Any],
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""Start a Step Functions execution for this job."""
|
|
import json
|
|
|
|
sfn_payload = {
|
|
"job_type": job_type,
|
|
"job_id": job_id,
|
|
**payload,
|
|
"callback_url": self.callback_url,
|
|
"api_key": self.callback_api_key,
|
|
}
|
|
|
|
response = self.sfn.start_execution(
|
|
stateMachineArn=self.state_machine_arn,
|
|
name=f"mpr-{job_id}",
|
|
input=json.dumps(sfn_payload),
|
|
)
|
|
|
|
execution_arn = response["executionArn"]
|
|
try:
|
|
from core.db import update_job_fields
|
|
update_job_fields(job_id, execution_arn=execution_arn)
|
|
except Exception:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
class GCPExecutor(Executor):
|
|
"""Execute jobs via Google Cloud Run Jobs."""
|
|
|
|
def __init__(self):
|
|
from google.cloud import run_v2
|
|
|
|
self.client = run_v2.JobsClient()
|
|
self.project_id = os.environ["GCP_PROJECT_ID"]
|
|
self.region = os.environ.get("GCP_REGION", "us-central1")
|
|
self.job_name = os.environ["CLOUD_RUN_JOB"]
|
|
self.callback_url = os.environ.get("CALLBACK_URL", "")
|
|
self.callback_api_key = os.environ.get("CALLBACK_API_KEY", "")
|
|
|
|
def run(
|
|
self,
|
|
job_type: str,
|
|
job_id: str,
|
|
payload: Dict[str, Any],
|
|
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
|
) -> bool:
|
|
"""Trigger a Cloud Run Job execution for this job."""
|
|
import json
|
|
|
|
from google.cloud import run_v2
|
|
|
|
gcp_payload = {
|
|
"job_type": job_type,
|
|
"job_id": job_id,
|
|
**payload,
|
|
"callback_url": self.callback_url,
|
|
"api_key": self.callback_api_key,
|
|
}
|
|
|
|
job_path = (
|
|
f"projects/{self.project_id}/locations/{self.region}/jobs/{self.job_name}"
|
|
)
|
|
|
|
request = run_v2.RunJobRequest(
|
|
name=job_path,
|
|
overrides=run_v2.RunJobRequest.Overrides(
|
|
container_overrides=[
|
|
run_v2.RunJobRequest.Overrides.ContainerOverride(
|
|
env=[
|
|
run_v2.EnvVar(
|
|
name="MPR_JOB_PAYLOAD",
|
|
value=json.dumps(gcp_payload),
|
|
)
|
|
]
|
|
)
|
|
]
|
|
),
|
|
)
|
|
|
|
operation = self.client.run_job(request=request)
|
|
execution_name = operation.metadata.name
|
|
|
|
try:
|
|
from core.db import update_job_fields
|
|
|
|
update_job_fields(job_id, execution_arn=execution_name)
|
|
except Exception:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
# Executor registry
|
|
_executors: Dict[str, type] = {
|
|
"local": LocalExecutor,
|
|
"lambda": LambdaExecutor,
|
|
"gcp": GCPExecutor,
|
|
}
|
|
|
|
_executor_instance: Optional[Executor] = None
|
|
|
|
|
|
def get_executor() -> Executor:
|
|
"""Get the configured executor instance."""
|
|
global _executor_instance
|
|
|
|
if _executor_instance is None:
|
|
executor_type = MPR_EXECUTOR.lower()
|
|
if executor_type not in _executors:
|
|
raise ValueError(f"Unknown executor type: {executor_type}")
|
|
_executor_instance = _executors[executor_type]()
|
|
|
|
return _executor_instance
|