#!/usr/bin/env python3 """ MPR Model Generator Generates framework-specific models from schema/models/: - Django ORM models -> mpr/media_assets/models.py - Pydantic schemas -> api/schemas/*.py - TypeScript types -> ui/timeline/src/types.ts - Protobuf -> grpc/protos/worker.proto Usage: python schema/generate.py [--django] [--pydantic] [--typescript] [--proto] [--all] """ import argparse import dataclasses as dc import subprocess import sys from enum import Enum from pathlib import Path from typing import Any, Callable, Union, get_args, get_origin, get_type_hints PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from schema.models import API_MODELS, DATACLASSES, ENUMS, GRPC_MESSAGES, GRPC_SERVICE # ============================================================================= # Type Dispatch Tables # ============================================================================= DJANGO_TYPES: dict[Any, str] = { str: "models.CharField(max_length={max_length}{opts})", int: "models.IntegerField({opts})", float: "models.FloatField({opts})", bool: "models.BooleanField(default={default})", "UUID": "models.UUIDField({opts})", "datetime": "models.DateTimeField({opts})", "dict": "models.JSONField(default=dict, blank=True)", "list": "models.JSONField(default=list, blank=True)", "text": "models.TextField(blank=True, default='')", "bigint": "models.BigIntegerField({opts})", "enum": "models.CharField(max_length=20, choices=Status.choices{opts})", } DJANGO_SPECIAL: dict[str, str] = { "id": "models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)", "created_at": "models.DateTimeField(auto_now_add=True)", "updated_at": "models.DateTimeField(auto_now=True)", } PYDANTIC_RESOLVERS: dict[Any, Callable[[Any], str]] = { str: lambda _: "str", int: lambda _: "int", float: lambda _: "float", bool: lambda _: "bool", "UUID": lambda _: "UUID", "datetime": lambda _: "datetime", "dict": lambda _: "Dict[str, Any]", "list": lambda base: f"List[{get_list_inner(base)}]", "enum": lambda base: base.__name__, } TS_RESOLVERS: dict[Any, Callable[[Any], str]] = { str: lambda _: "string", int: lambda _: "number", float: lambda _: "number", bool: lambda _: "boolean", "UUID": lambda _: "string", "datetime": lambda _: "string", "dict": lambda _: "Record", "list": lambda base: f"{TS_RESOLVERS.get(get_args(base)[0], lambda _: 'string')(None)}[]" if get_args(base) else "string[]", "enum": lambda base: base.__name__, } PROTO_RESOLVERS: dict[Any, Callable[[Any], str]] = { str: lambda _: "string", int: lambda _: "int32", float: lambda _: "float", bool: lambda _: "bool", "list": lambda base: f"repeated {PROTO_RESOLVERS.get(get_args(base)[0], lambda _: 'string')(None)}" if get_args(base) else "repeated string", } # ============================================================================= # Type Helpers # ============================================================================= def unwrap_optional(type_hint: Any) -> tuple[Any, bool]: """Unwrap Optional[T] -> (T, True) or (T, False) if not optional.""" origin = get_origin(type_hint) if origin is Union: args = [a for a in get_args(type_hint) if a is not type(None)] return (args[0] if args else str, True) return (type_hint, False) def get_origin_name(type_hint: Any) -> str | None: """Get origin type name: 'dict', 'list', or None.""" origin = get_origin(type_hint) if origin is dict: return "dict" if origin is list: return "list" return None def get_type_name(type_hint: Any) -> str | None: """Get type name for special types like UUID, datetime.""" if hasattr(type_hint, "__name__"): return type_hint.__name__ return None def get_list_inner(type_hint: Any) -> str: """Get inner type of List[T].""" args = get_args(type_hint) if args and args[0] in (str, int, float, bool): return {str: "str", int: "int", float: "float", bool: "bool"}[args[0]] return "str" def get_field_default(field: dc.Field) -> Any: """Get default value from dataclass field.""" if field.default is not dc.MISSING: return field.default return dc.MISSING def format_opts(optional: bool, extra: list[str] | None = None) -> str: """Format field options string.""" parts = [] if optional: parts.append("null=True, blank=True") if extra: parts.extend(extra) return ", ".join(parts) # ============================================================================= # Django Generator # ============================================================================= def resolve_django_type(name: str, type_hint: Any, default: Any) -> str: """Resolve Python type to Django field.""" # Special fields if name in DJANGO_SPECIAL: return DJANGO_SPECIAL[name] base, optional = unwrap_optional(type_hint) origin = get_origin_name(base) type_name = get_type_name(base) opts = format_opts(optional) # Container types if origin == "dict": return DJANGO_TYPES["dict"] if origin == "list": return DJANGO_TYPES["list"] # UUID / datetime if type_name == "UUID": return DJANGO_TYPES["UUID"].format(opts=opts) if type_name == "datetime": return DJANGO_TYPES["datetime"].format(opts=opts) # Enum if isinstance(base, type) and issubclass(base, Enum): extra = [] if optional: extra.append("null=True, blank=True") if default is not dc.MISSING and isinstance(default, Enum): extra.append(f"default=Status.{default.name}") return DJANGO_TYPES["enum"].format( opts=", " + ", ".join(extra) if extra else "" ) # Text fields if base is str and any(x in name for x in ("message", "comments", "description")): return DJANGO_TYPES["text"] # BigInt fields if base is int and name in ("file_size", "bitrate"): return DJANGO_TYPES["bigint"].format(opts=opts) # Basic types if base is str: max_length = 1000 if "path" in name else 500 if "filename" in name else 255 return DJANGO_TYPES[str].format( max_length=max_length, opts=", " + opts if opts else "" ) if base is int: extra = [opts] if opts else [] if default is not dc.MISSING and not callable(default): extra.append(f"default={default}") return DJANGO_TYPES[int].format(opts=", ".join(extra)) if base is float: extra = [opts] if opts else [] if default is not dc.MISSING and not callable(default): extra.append(f"default={default}") return DJANGO_TYPES[float].format(opts=", ".join(extra)) if base is bool: default_val = default if default is not dc.MISSING else False return DJANGO_TYPES[bool].format(default=default_val) # Fallback return DJANGO_TYPES[str].format(max_length=255, opts=", " + opts if opts else "") def generate_django_model(cls: type) -> list[str]: """Generate Django model lines from dataclass.""" lines = [ f"class {cls.__name__}(models.Model):", f' """{(cls.__doc__ or cls.__name__).strip().split(chr(10))[0]}"""', "", ] hints = get_type_hints(cls) fields = {f.name: f for f in dc.fields(cls)} # Add Status inner class for enum fields for type_hint in hints.values(): base, _ = unwrap_optional(type_hint) if isinstance(base, type) and issubclass(base, Enum): lines.append(" class Status(models.TextChoices):") for member in base: label = member.name.replace("_", " ").title() lines.append(f' {member.name} = "{member.value}", "{label}"') lines.append("") break # Fields for name, type_hint in hints.items(): if name.startswith("_"): continue field = fields.get(name) default = get_field_default(field) if field else dc.MISSING django_field = resolve_django_type(name, type_hint, default) lines.append(f" {name} = {django_field}") # Meta and __str__ lines.extend( [ "", " class Meta:", ' ordering = ["-created_at"]', "", " def __str__(self):", ] ) if "filename" in hints: lines.append(" return self.filename") elif "name" in hints: lines.append(" return self.name") else: lines.append(" return str(self.id)") return lines def generate_django() -> str: """Generate complete Django models file.""" header = [ '"""', "Django ORM Models - GENERATED FILE", "", "Do not edit directly. Modify schema/models/*.py and run:", " python schema/generate.py --django", '"""', "", "import uuid", "from django.db import models", "", ] body = [] for cls in DATACLASSES: body.extend(generate_django_model(cls)) body.extend(["", ""]) return "\n".join(header + body) # ============================================================================= # Pydantic Generator # ============================================================================= def resolve_pydantic_type(type_hint: Any) -> str: """Resolve Python type to Pydantic type string.""" base, optional = unwrap_optional(type_hint) origin = get_origin_name(base) type_name = get_type_name(base) # Look up resolver by origin, type name, base type, or enum resolver = ( PYDANTIC_RESOLVERS.get(origin) or PYDANTIC_RESOLVERS.get(type_name) or PYDANTIC_RESOLVERS.get(base) or ( PYDANTIC_RESOLVERS["enum"] if isinstance(base, type) and issubclass(base, Enum) else None ) ) result = resolver(base) if resolver else "str" return f"Optional[{result}]" if optional else result def generate_pydantic_schema(cls: type, suffix: str) -> list[str]: """Generate Pydantic schema lines from dataclass.""" name = cls.__name__.replace("Transcode", "").replace("Media", "") class_name = f"{name}{suffix}" skip_fields = { "Create": {"id", "created_at", "updated_at", "status", "error_message"}, "Update": {"id", "created_at", "updated_at"}, "Response": set(), } lines = [ f"class {class_name}(BaseSchema):", f' """{class_name} schema."""', ] hints = get_type_hints(cls) fields = {f.name: f for f in dc.fields(cls)} for name, type_hint in hints.items(): if name.startswith("_") or name in skip_fields.get(suffix, set()): continue py_type = resolve_pydantic_type(type_hint) # Update schemas: all fields optional if suffix == "Update" and "Optional" not in py_type: py_type = f"Optional[{py_type}]" field = fields.get(name) default = get_field_default(field) if field else dc.MISSING if "Optional" in py_type: lines.append(f" {name}: {py_type} = None") elif default is not dc.MISSING and not callable(default): if isinstance(default, str): lines.append(f' {name}: {py_type} = "{default}"') elif isinstance(default, Enum): lines.append( f" {name}: {py_type} = {default.__class__.__name__}.{default.name}" ) else: lines.append(f" {name}: {py_type} = {default!r}") else: lines.append(f" {name}: {py_type}") return lines def generate_pydantic() -> dict[str, str]: """Generate all Pydantic schema files.""" files = {} # base.py files["base.py"] = "\n".join( [ '"""Pydantic Base Schema - GENERATED FILE"""', "", "from pydantic import BaseModel, ConfigDict", "", "", "class BaseSchema(BaseModel):", ' """Base schema with ORM mode."""', " model_config = ConfigDict(from_attributes=True)", "", ] ) # Schema files per model for cls in DATACLASSES: module_name = cls.__name__.replace("Transcode", "").replace("Media", "").lower() lines = [ f'"""{cls.__name__} Schemas - GENERATED FILE"""', "", "from datetime import datetime", "from enum import Enum", "from typing import Any, Dict, List, Optional", "from uuid import UUID", "", "from .base import BaseSchema", "", ] # Add enum if present hints = get_type_hints(cls) for type_hint in hints.values(): base, _ = unwrap_optional(type_hint) if isinstance(base, type) and issubclass(base, Enum): lines.extend( [ "", f"class {base.__name__}(str, Enum):", ] ) for m in base: lines.append(f' {m.name} = "{m.value}"') lines.append("") break # Schemas for suffix in ["Create", "Update", "Response"]: lines.append("") lines.extend(generate_pydantic_schema(cls, suffix)) lines.append("") files[f"{module_name}.py"] = "\n".join(lines) # __init__.py imports = ["from .base import BaseSchema"] all_exports = ['"BaseSchema"'] for cls in DATACLASSES: name = cls.__name__.replace("Transcode", "").replace("Media", "") module = name.lower() imports.append( f"from .{module} import {name}Create, {name}Update, {name}Response" ) all_exports.extend([f'"{name}Create"', f'"{name}Update"', f'"{name}Response"']) # Add enum export hints = get_type_hints(cls) for type_hint in hints.values(): base, _ = unwrap_optional(type_hint) if isinstance(base, type) and issubclass(base, Enum): imports.append(f"from .{module} import {base.__name__}") all_exports.append(f'"{base.__name__}"') break files["__init__.py"] = "\n".join( [ '"""API Schemas - GENERATED FILE"""', "", *imports, "", f"__all__ = [{', '.join(all_exports)}]", "", ] ) return files # ============================================================================= # TypeScript Generator # ============================================================================= def resolve_ts_type(type_hint: Any) -> str: """Resolve Python type to TypeScript type string.""" base, optional = unwrap_optional(type_hint) origin = get_origin_name(base) type_name = get_type_name(base) # Look up resolver by origin, type name, base type, or enum resolver = ( TS_RESOLVERS.get(origin) or TS_RESOLVERS.get(type_name) or TS_RESOLVERS.get(base) or ( TS_RESOLVERS["enum"] if isinstance(base, type) and issubclass(base, Enum) else None ) ) result = resolver(base) if resolver else "string" return f"{result} | null" if optional else result def generate_ts_interface(cls: type) -> list[str]: """Generate TypeScript interface lines from dataclass.""" lines = [f"export interface {cls.__name__} {{"] for name, type_hint in get_type_hints(cls).items(): if name.startswith("_"): continue ts_type = resolve_ts_type(type_hint) lines.append(f" {name}: {ts_type};") lines.append("}") return lines def generate_typescript() -> str: """Generate complete TypeScript file.""" lines = [ "/**", " * MPR TypeScript Types - GENERATED FILE", " *", " * Do not edit directly. Modify schema/models/*.py and run:", " * python schema/generate.py --typescript", " */", "", ] # Enums as union types for enum in ENUMS: values = " | ".join(f'"{m.value}"' for m in enum) lines.append(f"export type {enum.__name__} = {values};") lines.append("") # Interfaces - domain models for cls in DATACLASSES: lines.extend(generate_ts_interface(cls)) lines.append("") # Interfaces - API request/response models lines.append("// API Request/Response Types") lines.append("") for cls in API_MODELS: lines.extend(generate_ts_interface(cls)) lines.append("") return "\n".join(lines) # ============================================================================= # Proto Generator # ============================================================================= def resolve_proto_type(type_hint: Any) -> tuple[str, bool]: """Resolve Python type to proto type. Returns (type, is_optional).""" base, optional = unwrap_optional(type_hint) origin = get_origin_name(base) # Look up resolver by origin or base type resolver = PROTO_RESOLVERS.get(origin) or PROTO_RESOLVERS.get(base) if resolver: result = resolver(base) is_repeated = result.startswith("repeated") return result, optional and not is_repeated return "string", optional def generate_proto_message(cls: type) -> list[str]: """Generate proto message lines from dataclass.""" lines = [f"message {cls.__name__} {{"] hints = get_type_hints(cls) if not hints: lines.append(" // Empty") else: for i, (name, type_hint) in enumerate(hints.items(), 1): proto_type, optional = resolve_proto_type(type_hint) prefix = ( "optional " if optional and not proto_type.startswith("repeated") else "" ) lines.append(f" {prefix}{proto_type} {name} = {i};") lines.append("}") return lines def generate_proto() -> str: """Generate complete proto file.""" lines = [ "// MPR Worker Service - GENERATED FILE", "//", "// Do not edit directly. Modify schema/models/grpc.py and run:", "// python schema/generate.py --proto", "", 'syntax = "proto3";', "", f"package {GRPC_SERVICE['package']};", "", f"service {GRPC_SERVICE['name']} {{", ] # Methods for m in GRPC_SERVICE["methods"]: req = m["request"].__name__ resp = m["response"].__name__ returns = f"stream {resp}" if m["stream_response"] else resp lines.append(f" rpc {m['name']}({req}) returns ({returns});") lines.extend(["}", ""]) # Messages for cls in GRPC_MESSAGES: lines.extend(generate_proto_message(cls)) lines.append("") return "\n".join(lines) # ============================================================================= # Writers # ============================================================================= def write_file(path: Path, content: str) -> None: """Write content to file, creating directories as needed.""" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content) print(f" {path}") def write_django(output_dir: Path) -> None: """Write Django models.""" write_file(output_dir / "mpr" / "media_assets" / "models.py", generate_django()) def write_pydantic(output_dir: Path) -> None: """Write Pydantic schemas.""" schemas_dir = output_dir / "api" / "schemas" for filename, content in generate_pydantic().items(): write_file(schemas_dir / filename, content) def write_typescript(output_dir: Path) -> None: """Write TypeScript types.""" write_file( output_dir / "ui" / "timeline" / "src" / "types.ts", generate_typescript() ) def write_proto(output_dir: Path) -> None: """Write proto and generate stubs.""" proto_dir = output_dir / "grpc" / "protos" proto_path = proto_dir / "worker.proto" write_file(proto_path, generate_proto()) # Generate Python stubs grpc_dir = output_dir / "grpc" result = subprocess.run( [ sys.executable, "-m", "grpc_tools.protoc", f"-I{proto_dir}", f"--python_out={grpc_dir}", f"--grpc_python_out={grpc_dir}", str(proto_path), ], capture_output=True, text=True, ) if result.returncode == 0: print(f" {grpc_dir}/worker_pb2.py") print(f" {grpc_dir}/worker_pb2_grpc.py") else: print(" Warning: grpc_tools failed - pip install grpcio-tools") # ============================================================================= # Main # ============================================================================= def main() -> None: parser = argparse.ArgumentParser(description="Generate from schema") parser.add_argument("--django", action="store_true") parser.add_argument("--pydantic", action="store_true") parser.add_argument("--typescript", action="store_true") parser.add_argument("--proto", action="store_true") parser.add_argument("--all", action="store_true") parser.add_argument("--output", type=Path, default=PROJECT_ROOT) args = parser.parse_args() if not any([args.django, args.pydantic, args.typescript, args.proto, args.all]): args.all = True print(f"Generating to {args.output}\n") targets: list[tuple[bool, str, Callable]] = [ (args.django or args.all, "Django", write_django), (args.pydantic or args.all, "Pydantic", write_pydantic), (args.typescript or args.all, "TypeScript", write_typescript), (args.proto or args.all, "Proto", write_proto), ] for enabled, name, writer in targets: if enabled: print(f"{name}:") writer(args.output) print() print("Done!") if __name__ == "__main__": main()