+```
+
+### Upload Files to GCS
+```bash
+gcloud storage cp video.mp4 gs://mpr-media-in/
+
+# Or with the aws CLI via compat endpoint
+aws --endpoint-url https://storage.googleapis.com s3 cp video.mp4 s3://mpr-media-in/
+```
+
+### Cloud Run Job Handler
+`task/gcp_handler.py` is the Cloud Run Job entrypoint. It reads the job payload
+from `MPR_JOB_PAYLOAD` (injected by `GCPExecutor`), uses `core/storage` for all
+GCS access (S3 compat), and POSTs the completion callback to the API.
+
+Set the Cloud Run Job command to: `python -m task.gcp_handler`
+
## Storage Module
`core/storage.py` provides all S3 operations:
@@ -114,7 +155,14 @@ mutation { scanMediaFolder { found registered skipped files } }
4. Uploads result to `S3_BUCKET_OUT`
5. Calls back to API with result
-Both paths use the same S3 buckets and key structure.
+### Cloud Run Job Mode (GCP)
+1. `GCPExecutor` triggers Cloud Run Job with payload in `MPR_JOB_PAYLOAD`
+2. `task/gcp_handler.py` downloads source from `S3_BUCKET_IN` (GCS S3 compat)
+3. Runs FFmpeg in container
+4. Uploads result to `S3_BUCKET_OUT` (GCS S3 compat)
+5. Calls back to API with result
+
+All three paths use the same S3-compatible bucket names and key structure.
## Supported File Types
diff --git a/docs/index.html b/docs/index.html
index f72e35e..7b5f774 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -9,8 +9,9 @@
MPR - Media Processor
- Media transcoding platform with dual execution modes: local (Celery
- + MinIO) and cloud (AWS Step Functions + Lambda + S3).
+ Media transcoding platform with three execution modes: local (Celery
+ + MinIO), AWS (Step Functions + Lambda + S3), and GCP (Cloud Run
+ Jobs + GCS). Storage is S3-compatible across all environments.
@@ -54,6 +55,21 @@
>Open full size
+
@@ -73,7 +89,11 @@
- AWS (Step Functions, Lambda - cloud mode)
+ AWS (Step Functions, Lambda)
+
+
+
+ GCP (Cloud Run Jobs + GCS)
@@ -81,7 +101,7 @@
- S3 Storage (MinIO local / AWS S3 cloud)
+ S3-compatible Storage (MinIO / AWS S3 / GCS)
diff --git a/modelgen/generator/__init__.py b/modelgen/generator/__init__.py
index 5abc0ac..be49ab4 100644
--- a/modelgen/generator/__init__.py
+++ b/modelgen/generator/__init__.py
@@ -7,17 +7,17 @@ Supported generators:
- TypeScriptGenerator: TypeScript interfaces
- ProtobufGenerator: Protocol Buffer definitions
- PrismaGenerator: Prisma schema
-- GrapheneGenerator: Graphene ObjectType/InputObjectType classes
+- StrawberryGenerator: Strawberry type/input/enum classes
"""
from typing import Dict, Type
from .base import BaseGenerator
from .django import DjangoGenerator
-from .graphene import GrapheneGenerator
from .prisma import PrismaGenerator
from .protobuf import ProtobufGenerator
from .pydantic import PydanticGenerator
+from .strawberry import StrawberryGenerator
from .typescript import TypeScriptGenerator
# Registry of available generators
@@ -29,14 +29,14 @@ GENERATORS: Dict[str, Type[BaseGenerator]] = {
"protobuf": ProtobufGenerator,
"proto": ProtobufGenerator, # Alias
"prisma": PrismaGenerator,
- "graphene": GrapheneGenerator,
+ "strawberry": StrawberryGenerator,
}
__all__ = [
"BaseGenerator",
"PydanticGenerator",
"DjangoGenerator",
- "GrapheneGenerator",
+ "StrawberryGenerator",
"TypeScriptGenerator",
"ProtobufGenerator",
"PrismaGenerator",
diff --git a/modelgen/generator/graphene.py b/modelgen/generator/strawberry.py
similarity index 62%
rename from modelgen/generator/graphene.py
rename to modelgen/generator/strawberry.py
index 503bbba..14ed78e 100644
--- a/modelgen/generator/graphene.py
+++ b/modelgen/generator/strawberry.py
@@ -1,28 +1,29 @@
"""
-Graphene Generator
+Strawberry Generator
-Generates graphene ObjectType and InputObjectType classes from model definitions.
+Generates strawberry type, input, and enum classes from model definitions.
Only generates type definitions — queries, mutations, and resolvers are hand-written.
"""
+import dataclasses as dc
from enum import Enum
from pathlib import Path
from typing import Any, List, get_type_hints
from ..helpers import get_origin_name, get_type_name, unwrap_optional
from ..loader.schema import EnumDefinition, FieldDefinition, ModelDefinition
-from ..types import GRAPHENE_RESOLVERS
+from ..types import STRAWBERRY_RESOLVERS
from .base import BaseGenerator
-class GrapheneGenerator(BaseGenerator):
- """Generates graphene type definition files."""
+class StrawberryGenerator(BaseGenerator):
+ """Generates strawberry type definition files."""
def file_extension(self) -> str:
return ".py"
def generate(self, models, output_path: Path) -> None:
- """Generate graphene types to output_path."""
+ """Generate strawberry types to output_path."""
output_path.parent.mkdir(parents=True, exist_ok=True)
if hasattr(models, "models"):
@@ -47,22 +48,18 @@ class GrapheneGenerator(BaseGenerator):
enums: List[EnumDefinition],
api_models: List[ModelDefinition],
) -> str:
- """Generate from ModelDefinition objects."""
lines = self._generate_header()
- # Generate enums as graphene.Enum
for enum_def in enums:
lines.extend(self._generate_enum(enum_def))
lines.append("")
lines.append("")
- # Generate domain models as ObjectType
for model_def in models:
lines.extend(self._generate_object_type(model_def))
lines.append("")
lines.append("")
- # Generate API models — request types as InputObjectType, others as ObjectType
for model_def in api_models:
if model_def.name.endswith("Request"):
lines.extend(self._generate_input_type(model_def))
@@ -74,7 +71,6 @@ class GrapheneGenerator(BaseGenerator):
return "\n".join(lines).rstrip() + "\n"
def _generate_from_dataclasses(self, dataclasses: List[type]) -> str:
- """Generate from Python dataclasses."""
lines = self._generate_header()
enums_generated = set()
@@ -99,37 +95,38 @@ class GrapheneGenerator(BaseGenerator):
def _generate_header(self) -> List[str]:
return [
'"""',
- "Graphene Types - GENERATED FILE",
+ "Strawberry Types - GENERATED FILE",
"",
"Do not edit directly. Regenerate using modelgen.",
'"""',
"",
- "import graphene",
+ "import strawberry",
+ "from enum import Enum",
+ "from typing import List, Optional",
+ "from uuid import UUID",
+ "from datetime import datetime",
+ "from strawberry.scalars import JSON",
"",
"",
]
def _generate_enum(self, enum_def: EnumDefinition) -> List[str]:
- """Generate graphene.Enum from EnumDefinition."""
- lines = [f"class {enum_def.name}(graphene.Enum):"]
+ lines = ["@strawberry.enum", f"class {enum_def.name}(Enum):"]
for name, value in enum_def.values:
lines.append(f' {name} = "{value}"')
return lines
def _generate_enum_from_python(self, enum_cls: type) -> List[str]:
- """Generate graphene.Enum from Python Enum."""
- lines = [f"class {enum_cls.__name__}(graphene.Enum):"]
+ lines = ["@strawberry.enum", f"class {enum_cls.__name__}(Enum):"]
for member in enum_cls:
lines.append(f' {member.name} = "{member.value}"')
return lines
def _generate_object_type(self, model_def: ModelDefinition) -> List[str]:
- """Generate graphene.ObjectType from ModelDefinition."""
name = model_def.name
- # Append Type suffix if not already present
type_name = f"{name}Type" if not name.endswith("Type") else name
- lines = [f"class {type_name}(graphene.ObjectType):"]
+ lines = ["@strawberry.type", f"class {type_name}:"]
if model_def.docstring:
doc = model_def.docstring.strip().split("\n")[0]
lines.append(f' """{doc}"""')
@@ -139,23 +136,19 @@ class GrapheneGenerator(BaseGenerator):
lines.append(" pass")
else:
for field in model_def.fields:
- graphene_type = self._resolve_type(field.type_hint, field.optional)
- lines.append(f" {field.name} = {graphene_type}")
+ type_str = self._resolve_type(field.type_hint, optional=True)
+ lines.append(f" {field.name}: {type_str} = None")
return lines
def _generate_input_type(self, model_def: ModelDefinition) -> List[str]:
- """Generate graphene.InputObjectType from ModelDefinition."""
- import dataclasses as dc
-
name = model_def.name
- # Convert FooRequest -> FooInput
if name.endswith("Request"):
input_name = name[: -len("Request")] + "Input"
else:
input_name = f"{name}Input"
- lines = [f"class {input_name}(graphene.InputObjectType):"]
+ lines = ["@strawberry.input", f"class {input_name}:"]
if model_def.docstring:
doc = model_def.docstring.strip().split("\n")[0]
lines.append(f' """{doc}"""')
@@ -164,73 +157,64 @@ class GrapheneGenerator(BaseGenerator):
if not model_def.fields:
lines.append(" pass")
else:
+ # Required fields first, then optional/defaulted
+ required = []
+ optional = []
for field in model_def.fields:
- graphene_type = self._resolve_type(field.type_hint, field.optional)
- # Required only if not optional AND no default value
has_default = field.default is not dc.MISSING
if not field.optional and not has_default:
- graphene_type = self._make_required(graphene_type)
- elif has_default and not field.optional:
- graphene_type = self._add_default(graphene_type, field.default)
- lines.append(f" {field.name} = {graphene_type}")
+ required.append(field)
+ else:
+ optional.append(field)
+
+ for field in required:
+ type_str = self._resolve_type(field.type_hint, optional=False)
+ lines.append(f" {field.name}: {type_str}")
+
+ for field in optional:
+ has_default = field.default is not dc.MISSING
+ if has_default and not callable(field.default):
+ type_str = self._resolve_type(field.type_hint, optional=False)
+ lines.append(f" {field.name}: {type_str} = {field.default!r}")
+ else:
+ type_str = self._resolve_type(field.type_hint, optional=True)
+ lines.append(f" {field.name}: {type_str} = None")
return lines
def _generate_object_type_from_dataclass(self, cls: type) -> List[str]:
- """Generate graphene.ObjectType from a dataclass."""
- import dataclasses as dc
-
type_name = f"{cls.__name__}Type"
- lines = [f"class {type_name}(graphene.ObjectType):"]
+ lines = ["@strawberry.type", f"class {type_name}:"]
hints = get_type_hints(cls)
for name, type_hint in hints.items():
if name.startswith("_"):
continue
- graphene_type = self._resolve_type(type_hint, False)
- lines.append(f" {name} = {graphene_type}")
+ type_str = self._resolve_type(type_hint, optional=True)
+ lines.append(f" {name}: {type_str} = None")
return lines
def _resolve_type(self, type_hint: Any, optional: bool) -> str:
- """Resolve Python type to graphene field call string."""
+ """Resolve Python type hint to a strawberry annotation string."""
base, is_optional = unwrap_optional(type_hint)
optional = optional or is_optional
origin = get_origin_name(base)
type_name = get_type_name(base)
- # Look up resolver
resolver = (
- GRAPHENE_RESOLVERS.get(origin)
- or GRAPHENE_RESOLVERS.get(type_name)
- or GRAPHENE_RESOLVERS.get(base)
+ STRAWBERRY_RESOLVERS.get(origin)
+ or STRAWBERRY_RESOLVERS.get(type_name)
+ or STRAWBERRY_RESOLVERS.get(base)
or (
- GRAPHENE_RESOLVERS["enum"]
+ STRAWBERRY_RESOLVERS["enum"]
if isinstance(base, type) and issubclass(base, Enum)
else None
)
)
- result = resolver(base) if resolver else "graphene.String"
+ inner = resolver(base) if resolver else "str"
- # List types already have () syntax from resolver
- if result.startswith("graphene.List("):
- return result
-
- # Scalar types: add () call
- return f"{result}()"
-
- def _make_required(self, field_str: str) -> str:
- """Add required=True to a graphene field."""
- if field_str.endswith("()"):
- return field_str[:-1] + "required=True)"
- return field_str
-
- def _add_default(self, field_str: str, default: Any) -> str:
- """Add default_value to a graphene field."""
- if callable(default):
- # default_factory — skip, graphene doesn't support factories
- return field_str
- if field_str.endswith("()"):
- return field_str[:-1] + f"default_value={default!r})"
- return field_str
+ if optional:
+ return f"Optional[{inner}]"
+ return inner
diff --git a/modelgen/types.py b/modelgen/types.py
index cf35e48..4a8d7f6 100644
--- a/modelgen/types.py
+++ b/modelgen/types.py
@@ -139,34 +139,34 @@ PRISMA_SPECIAL: dict[str, str] = {
}
# =============================================================================
-# Graphene Type Resolvers
+# Strawberry Type Resolvers
# =============================================================================
-def _resolve_graphene_list(base: Any) -> str:
- """Resolve graphene List type."""
+def _resolve_strawberry_list(base: Any) -> str:
+ """Resolve strawberry List type annotation."""
args = get_args(base)
if args:
inner = args[0]
if inner is str:
- return "graphene.List(graphene.String)"
+ return "List[str]"
elif inner is int:
- return "graphene.List(graphene.Int)"
+ return "List[int]"
elif inner is float:
- return "graphene.List(graphene.Float)"
+ return "List[float]"
elif inner is bool:
- return "graphene.List(graphene.Boolean)"
- return "graphene.List(graphene.String)"
+ return "List[bool]"
+ return "List[str]"
-GRAPHENE_RESOLVERS: dict[Any, Callable[[Any], str]] = {
- str: lambda _: "graphene.String",
- int: lambda _: "graphene.Int",
- float: lambda _: "graphene.Float",
- bool: lambda _: "graphene.Boolean",
- "UUID": lambda _: "graphene.UUID",
- "datetime": lambda _: "graphene.DateTime",
- "dict": lambda _: "graphene.JSONString",
- "list": _resolve_graphene_list,
- "enum": lambda base: f"graphene.String", # Enums exposed as strings in GQL
+STRAWBERRY_RESOLVERS: dict[Any, Callable[[Any], str]] = {
+ str: lambda _: "str",
+ int: lambda _: "int",
+ float: lambda _: "float",
+ bool: lambda _: "bool",
+ "UUID": lambda _: "UUID",
+ "datetime": lambda _: "datetime",
+ "dict": lambda _: "JSON",
+ "list": _resolve_strawberry_list,
+ "enum": lambda base: base.__name__,
}
diff --git a/requirements.txt b/requirements.txt
index 58d86e0..0bcac2d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -21,10 +21,13 @@ grpcio-tools>=1.60.0
# AWS
boto3>=1.34.0
+requests>=2.31.0
+
+# GCP (optional — only needed when MPR_EXECUTOR=gcp)
+google-cloud-run>=0.10.0
# GraphQL
-graphene>=3.3
-starlette-graphene3>=0.6.0
+strawberry-graphql[fastapi]>=0.311.0
# Testing
pytest>=7.4.0
diff --git a/task/executor.py b/task/executor.py
index 10f00c9..bad49ff 100644
--- a/task/executor.py
+++ b/task/executor.py
@@ -164,10 +164,84 @@ class LambdaExecutor(Executor):
return True
+class GCPExecutor(Executor):
+ """Execute jobs via Google Cloud Run Jobs."""
+
+ def __init__(self):
+ from google.cloud import run_v2
+
+ self.client = run_v2.JobsClient()
+ self.project_id = os.environ["GCP_PROJECT_ID"]
+ self.region = os.environ.get("GCP_REGION", "us-central1")
+ self.job_name = os.environ["CLOUD_RUN_JOB"]
+ self.callback_url = os.environ.get("CALLBACK_URL", "")
+ self.callback_api_key = os.environ.get("CALLBACK_API_KEY", "")
+
+ def run(
+ self,
+ job_id: str,
+ source_path: str,
+ output_path: str,
+ preset: Optional[Dict[str, Any]] = None,
+ trim_start: Optional[float] = None,
+ trim_end: Optional[float] = None,
+ duration: Optional[float] = None,
+ progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
+ ) -> bool:
+ """Trigger a Cloud Run Job execution for this job."""
+ import json
+
+ from google.cloud import run_v2
+
+ payload = {
+ "job_id": job_id,
+ "source_key": source_path,
+ "output_key": output_path,
+ "preset": preset,
+ "trim_start": trim_start,
+ "trim_end": trim_end,
+ "duration": duration,
+ "callback_url": self.callback_url,
+ "api_key": self.callback_api_key,
+ }
+
+ job_path = (
+ f"projects/{self.project_id}/locations/{self.region}/jobs/{self.job_name}"
+ )
+
+ request = run_v2.RunJobRequest(
+ name=job_path,
+ overrides=run_v2.RunJobRequest.Overrides(
+ container_overrides=[
+ run_v2.RunJobRequest.Overrides.ContainerOverride(
+ env=[
+ run_v2.EnvVar(
+ name="MPR_JOB_PAYLOAD", value=json.dumps(payload)
+ )
+ ]
+ )
+ ]
+ ),
+ )
+
+ operation = self.client.run_job(request=request)
+ execution_name = operation.metadata.name
+
+ try:
+ from mpr.media_assets.models import TranscodeJob
+
+ TranscodeJob.objects.filter(id=job_id).update(execution_arn=execution_name)
+ except Exception:
+ pass
+
+ return True
+
+
# Executor registry
_executors: Dict[str, type] = {
"local": LocalExecutor,
"lambda": LambdaExecutor,
+ "gcp": GCPExecutor,
}
_executor_instance: Optional[Executor] = None
diff --git a/task/gcp_handler.py b/task/gcp_handler.py
new file mode 100644
index 0000000..c640e90
--- /dev/null
+++ b/task/gcp_handler.py
@@ -0,0 +1,121 @@
+"""
+Google Cloud Run Job handler for media transcoding.
+
+Reads job payload from the MPR_JOB_PAYLOAD env var (injected by GCPExecutor),
+downloads source from S3-compatible storage (GCS via HMAC + S3 API),
+runs FFmpeg, uploads result, and calls back to the API.
+
+Uses core/storage and core/ffmpeg — same modules as the Celery worker.
+No cloud-provider SDK required here; storage goes through core.storage (boto3 + S3 compat).
+
+Entry point: python -m task.gcp_handler (set as Cloud Run Job command)
+"""
+
+import json
+import logging
+import os
+import sys
+import tempfile
+from pathlib import Path
+
+import requests
+
+from core.ffmpeg.transcode import TranscodeConfig, transcode
+from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+def main() -> None:
+ raw = os.environ.get("MPR_JOB_PAYLOAD")
+ if not raw:
+ logger.error("MPR_JOB_PAYLOAD not set")
+ sys.exit(1)
+
+ event = json.loads(raw)
+ job_id = event["job_id"]
+ source_key = event["source_key"]
+ output_key = event["output_key"]
+ preset = event.get("preset")
+ trim_start = event.get("trim_start")
+ trim_end = event.get("trim_end")
+ duration = event.get("duration")
+ callback_url = event.get("callback_url", "")
+ api_key = event.get("api_key", "")
+
+ logger.info(f"Starting job {job_id}: {source_key} -> {output_key}")
+
+ tmp_source = download_to_temp(BUCKET_IN, source_key)
+ ext_out = Path(output_key).suffix or ".mp4"
+ fd, tmp_output = tempfile.mkstemp(suffix=ext_out)
+ os.close(fd)
+
+ try:
+ if preset:
+ config = TranscodeConfig(
+ input_path=tmp_source,
+ output_path=tmp_output,
+ video_codec=preset.get("video_codec", "libx264"),
+ video_bitrate=preset.get("video_bitrate"),
+ video_crf=preset.get("video_crf"),
+ video_preset=preset.get("video_preset"),
+ resolution=preset.get("resolution"),
+ framerate=preset.get("framerate"),
+ audio_codec=preset.get("audio_codec", "aac"),
+ audio_bitrate=preset.get("audio_bitrate"),
+ audio_channels=preset.get("audio_channels"),
+ audio_samplerate=preset.get("audio_samplerate"),
+ container=preset.get("container", "mp4"),
+ extra_args=preset.get("extra_args", []),
+ trim_start=trim_start,
+ trim_end=trim_end,
+ )
+ else:
+ config = TranscodeConfig(
+ input_path=tmp_source,
+ output_path=tmp_output,
+ video_codec="copy",
+ audio_codec="copy",
+ trim_start=trim_start,
+ trim_end=trim_end,
+ )
+
+ success = transcode(config, duration=duration)
+ if not success:
+ raise RuntimeError("Transcode returned False")
+
+ logger.info(f"Uploading to {BUCKET_OUT}/{output_key}")
+ upload_file(tmp_output, BUCKET_OUT, output_key)
+
+ _callback(callback_url, job_id, api_key, {"status": "completed"})
+ logger.info(f"Job {job_id} completed")
+ sys.exit(0)
+
+ except Exception as e:
+ logger.exception(f"Job {job_id} failed: {e}")
+ _callback(callback_url, job_id, api_key, {"status": "failed", "error": str(e)})
+ sys.exit(1)
+
+ finally:
+ for f in [tmp_source, tmp_output]:
+ try:
+ os.unlink(f)
+ except OSError:
+ pass
+
+
+def _callback(callback_url: str, job_id: str, api_key: str, payload: dict) -> None:
+ if not callback_url:
+ return
+ try:
+ url = f"{callback_url}/jobs/{job_id}/callback"
+ headers = {"X-API-Key": api_key} if api_key else {}
+ resp = requests.post(url, json=payload, headers=headers, timeout=10)
+ logger.info(f"Callback response: {resp.status_code}")
+ except Exception as e:
+ logger.warning(f"Callback failed: {e}")
+
+
+if __name__ == "__main__":
+ main()