django and fastapi apps
This commit is contained in:
702
schema/generate.py
Executable file
702
schema/generate.py
Executable file
@@ -0,0 +1,702 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
MPR Model Generator
|
||||
|
||||
Generates framework-specific models from schema/models/:
|
||||
- Django ORM models -> mpr/media_assets/models.py
|
||||
- Pydantic schemas -> api/schemas/*.py
|
||||
- TypeScript types -> ui/timeline/src/types.ts
|
||||
- Protobuf -> grpc/protos/worker.proto
|
||||
|
||||
Usage:
|
||||
python schema/generate.py [--django] [--pydantic] [--typescript] [--proto] [--all]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import dataclasses as dc
|
||||
import subprocess
|
||||
import sys
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Union, get_args, get_origin, get_type_hints
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from schema.models import DATACLASSES, ENUMS, GRPC_MESSAGES, GRPC_SERVICE
|
||||
|
||||
# =============================================================================
|
||||
# Type Dispatch Tables
|
||||
# =============================================================================
|
||||
|
||||
DJANGO_TYPES: dict[Any, str] = {
|
||||
str: "models.CharField(max_length={max_length}{opts})",
|
||||
int: "models.IntegerField({opts})",
|
||||
float: "models.FloatField({opts})",
|
||||
bool: "models.BooleanField(default={default})",
|
||||
"UUID": "models.UUIDField({opts})",
|
||||
"datetime": "models.DateTimeField({opts})",
|
||||
"dict": "models.JSONField(default=dict, blank=True)",
|
||||
"list": "models.JSONField(default=list, blank=True)",
|
||||
"text": "models.TextField(blank=True, default='')",
|
||||
"bigint": "models.BigIntegerField({opts})",
|
||||
"enum": "models.CharField(max_length=20, choices=Status.choices{opts})",
|
||||
}
|
||||
|
||||
DJANGO_SPECIAL: dict[str, str] = {
|
||||
"id": "models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)",
|
||||
"created_at": "models.DateTimeField(auto_now_add=True)",
|
||||
"updated_at": "models.DateTimeField(auto_now=True)",
|
||||
}
|
||||
|
||||
PYDANTIC_RESOLVERS: dict[Any, Callable[[Any], str]] = {
|
||||
str: lambda _: "str",
|
||||
int: lambda _: "int",
|
||||
float: lambda _: "float",
|
||||
bool: lambda _: "bool",
|
||||
"UUID": lambda _: "UUID",
|
||||
"datetime": lambda _: "datetime",
|
||||
"dict": lambda _: "Dict[str, Any]",
|
||||
"list": lambda base: f"List[{get_list_inner(base)}]",
|
||||
"enum": lambda base: base.__name__,
|
||||
}
|
||||
|
||||
TS_RESOLVERS: dict[Any, Callable[[Any], str]] = {
|
||||
str: lambda _: "string",
|
||||
int: lambda _: "number",
|
||||
float: lambda _: "number",
|
||||
bool: lambda _: "boolean",
|
||||
"UUID": lambda _: "string",
|
||||
"datetime": lambda _: "string",
|
||||
"dict": lambda _: "Record<string, unknown>",
|
||||
"list": lambda base: f"{TS_RESOLVERS.get(get_args(base)[0], lambda _: 'string')(None)}[]"
|
||||
if get_args(base)
|
||||
else "string[]",
|
||||
"enum": lambda base: base.__name__,
|
||||
}
|
||||
|
||||
PROTO_RESOLVERS: dict[Any, Callable[[Any], str]] = {
|
||||
str: lambda _: "string",
|
||||
int: lambda _: "int32",
|
||||
float: lambda _: "float",
|
||||
bool: lambda _: "bool",
|
||||
"list": lambda base: f"repeated {PROTO_RESOLVERS.get(get_args(base)[0], lambda _: 'string')(None)}"
|
||||
if get_args(base)
|
||||
else "repeated string",
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Type Helpers
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def unwrap_optional(type_hint: Any) -> tuple[Any, bool]:
|
||||
"""Unwrap Optional[T] -> (T, True) or (T, False) if not optional."""
|
||||
origin = get_origin(type_hint)
|
||||
if origin is Union:
|
||||
args = [a for a in get_args(type_hint) if a is not type(None)]
|
||||
return (args[0] if args else str, True)
|
||||
return (type_hint, False)
|
||||
|
||||
|
||||
def get_origin_name(type_hint: Any) -> str | None:
|
||||
"""Get origin type name: 'dict', 'list', or None."""
|
||||
origin = get_origin(type_hint)
|
||||
if origin is dict:
|
||||
return "dict"
|
||||
if origin is list:
|
||||
return "list"
|
||||
return None
|
||||
|
||||
|
||||
def get_type_name(type_hint: Any) -> str | None:
|
||||
"""Get type name for special types like UUID, datetime."""
|
||||
if hasattr(type_hint, "__name__"):
|
||||
return type_hint.__name__
|
||||
return None
|
||||
|
||||
|
||||
def get_list_inner(type_hint: Any) -> str:
|
||||
"""Get inner type of List[T]."""
|
||||
args = get_args(type_hint)
|
||||
if args and args[0] in (str, int, float, bool):
|
||||
return {str: "str", int: "int", float: "float", bool: "bool"}[args[0]]
|
||||
return "str"
|
||||
|
||||
|
||||
def get_field_default(field: dc.Field) -> Any:
|
||||
"""Get default value from dataclass field."""
|
||||
if field.default is not dc.MISSING:
|
||||
return field.default
|
||||
return dc.MISSING
|
||||
|
||||
|
||||
def format_opts(optional: bool, extra: list[str] | None = None) -> str:
|
||||
"""Format field options string."""
|
||||
parts = []
|
||||
if optional:
|
||||
parts.append("null=True, blank=True")
|
||||
if extra:
|
||||
parts.extend(extra)
|
||||
return ", ".join(parts)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Django Generator
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def resolve_django_type(name: str, type_hint: Any, default: Any) -> str:
|
||||
"""Resolve Python type to Django field."""
|
||||
# Special fields
|
||||
if name in DJANGO_SPECIAL:
|
||||
return DJANGO_SPECIAL[name]
|
||||
|
||||
base, optional = unwrap_optional(type_hint)
|
||||
origin = get_origin_name(base)
|
||||
type_name = get_type_name(base)
|
||||
opts = format_opts(optional)
|
||||
|
||||
# Container types
|
||||
if origin == "dict":
|
||||
return DJANGO_TYPES["dict"]
|
||||
if origin == "list":
|
||||
return DJANGO_TYPES["list"]
|
||||
|
||||
# UUID / datetime
|
||||
if type_name == "UUID":
|
||||
return DJANGO_TYPES["UUID"].format(opts=opts)
|
||||
if type_name == "datetime":
|
||||
return DJANGO_TYPES["datetime"].format(opts=opts)
|
||||
|
||||
# Enum
|
||||
if isinstance(base, type) and issubclass(base, Enum):
|
||||
extra = []
|
||||
if optional:
|
||||
extra.append("null=True, blank=True")
|
||||
if default is not dc.MISSING and isinstance(default, Enum):
|
||||
extra.append(f"default=Status.{default.name}")
|
||||
return DJANGO_TYPES["enum"].format(
|
||||
opts=", " + ", ".join(extra) if extra else ""
|
||||
)
|
||||
|
||||
# Text fields
|
||||
if base is str and any(x in name for x in ("message", "comments", "description")):
|
||||
return DJANGO_TYPES["text"]
|
||||
|
||||
# BigInt fields
|
||||
if base is int and name in ("file_size", "bitrate"):
|
||||
return DJANGO_TYPES["bigint"].format(opts=opts)
|
||||
|
||||
# Basic types
|
||||
if base is str:
|
||||
max_length = 1000 if "path" in name else 500 if "filename" in name else 255
|
||||
return DJANGO_TYPES[str].format(
|
||||
max_length=max_length, opts=", " + opts if opts else ""
|
||||
)
|
||||
|
||||
if base is int:
|
||||
extra = [opts] if opts else []
|
||||
if default is not dc.MISSING and not callable(default):
|
||||
extra.append(f"default={default}")
|
||||
return DJANGO_TYPES[int].format(opts=", ".join(extra))
|
||||
|
||||
if base is float:
|
||||
extra = [opts] if opts else []
|
||||
if default is not dc.MISSING and not callable(default):
|
||||
extra.append(f"default={default}")
|
||||
return DJANGO_TYPES[float].format(opts=", ".join(extra))
|
||||
|
||||
if base is bool:
|
||||
default_val = default if default is not dc.MISSING else False
|
||||
return DJANGO_TYPES[bool].format(default=default_val)
|
||||
|
||||
# Fallback
|
||||
return DJANGO_TYPES[str].format(max_length=255, opts=", " + opts if opts else "")
|
||||
|
||||
|
||||
def generate_django_model(cls: type) -> list[str]:
|
||||
"""Generate Django model lines from dataclass."""
|
||||
lines = [
|
||||
f"class {cls.__name__}(models.Model):",
|
||||
f' """{(cls.__doc__ or cls.__name__).strip().split(chr(10))[0]}"""',
|
||||
"",
|
||||
]
|
||||
|
||||
hints = get_type_hints(cls)
|
||||
fields = {f.name: f for f in dc.fields(cls)}
|
||||
|
||||
# Add Status inner class for enum fields
|
||||
for type_hint in hints.values():
|
||||
base, _ = unwrap_optional(type_hint)
|
||||
if isinstance(base, type) and issubclass(base, Enum):
|
||||
lines.append(" class Status(models.TextChoices):")
|
||||
for member in base:
|
||||
label = member.name.replace("_", " ").title()
|
||||
lines.append(f' {member.name} = "{member.value}", "{label}"')
|
||||
lines.append("")
|
||||
break
|
||||
|
||||
# Fields
|
||||
for name, type_hint in hints.items():
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
field = fields.get(name)
|
||||
default = get_field_default(field) if field else dc.MISSING
|
||||
django_field = resolve_django_type(name, type_hint, default)
|
||||
lines.append(f" {name} = {django_field}")
|
||||
|
||||
# Meta and __str__
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
" class Meta:",
|
||||
' ordering = ["-created_at"]',
|
||||
"",
|
||||
" def __str__(self):",
|
||||
]
|
||||
)
|
||||
|
||||
if "filename" in hints:
|
||||
lines.append(" return self.filename")
|
||||
elif "name" in hints:
|
||||
lines.append(" return self.name")
|
||||
else:
|
||||
lines.append(" return str(self.id)")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def generate_django() -> str:
|
||||
"""Generate complete Django models file."""
|
||||
header = [
|
||||
'"""',
|
||||
"Django ORM Models - GENERATED FILE",
|
||||
"",
|
||||
"Do not edit directly. Modify schema/models/*.py and run:",
|
||||
" python schema/generate.py --django",
|
||||
'"""',
|
||||
"",
|
||||
"import uuid",
|
||||
"from django.db import models",
|
||||
"",
|
||||
]
|
||||
|
||||
body = []
|
||||
for cls in DATACLASSES:
|
||||
body.extend(generate_django_model(cls))
|
||||
body.extend(["", ""])
|
||||
|
||||
return "\n".join(header + body)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Pydantic Generator
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def resolve_pydantic_type(type_hint: Any) -> str:
|
||||
"""Resolve Python type to Pydantic type string."""
|
||||
base, optional = unwrap_optional(type_hint)
|
||||
origin = get_origin_name(base)
|
||||
type_name = get_type_name(base)
|
||||
|
||||
# Look up resolver by origin, type name, base type, or enum
|
||||
resolver = (
|
||||
PYDANTIC_RESOLVERS.get(origin)
|
||||
or PYDANTIC_RESOLVERS.get(type_name)
|
||||
or PYDANTIC_RESOLVERS.get(base)
|
||||
or (
|
||||
PYDANTIC_RESOLVERS["enum"]
|
||||
if isinstance(base, type) and issubclass(base, Enum)
|
||||
else None
|
||||
)
|
||||
)
|
||||
|
||||
result = resolver(base) if resolver else "str"
|
||||
return f"Optional[{result}]" if optional else result
|
||||
|
||||
|
||||
def generate_pydantic_schema(cls: type, suffix: str) -> list[str]:
|
||||
"""Generate Pydantic schema lines from dataclass."""
|
||||
name = cls.__name__.replace("Transcode", "").replace("Media", "")
|
||||
class_name = f"{name}{suffix}"
|
||||
|
||||
skip_fields = {
|
||||
"Create": {"id", "created_at", "updated_at", "status", "error_message"},
|
||||
"Update": {"id", "created_at", "updated_at"},
|
||||
"Response": set(),
|
||||
}
|
||||
|
||||
lines = [
|
||||
f"class {class_name}(BaseSchema):",
|
||||
f' """{class_name} schema."""',
|
||||
]
|
||||
|
||||
hints = get_type_hints(cls)
|
||||
fields = {f.name: f for f in dc.fields(cls)}
|
||||
|
||||
for name, type_hint in hints.items():
|
||||
if name.startswith("_") or name in skip_fields.get(suffix, set()):
|
||||
continue
|
||||
|
||||
py_type = resolve_pydantic_type(type_hint)
|
||||
|
||||
# Update schemas: all fields optional
|
||||
if suffix == "Update" and "Optional" not in py_type:
|
||||
py_type = f"Optional[{py_type}]"
|
||||
|
||||
field = fields.get(name)
|
||||
default = get_field_default(field) if field else dc.MISSING
|
||||
|
||||
if "Optional" in py_type:
|
||||
lines.append(f" {name}: {py_type} = None")
|
||||
elif default is not dc.MISSING and not callable(default):
|
||||
if isinstance(default, str):
|
||||
lines.append(f' {name}: {py_type} = "{default}"')
|
||||
elif isinstance(default, Enum):
|
||||
lines.append(
|
||||
f" {name}: {py_type} = {default.__class__.__name__}.{default.name}"
|
||||
)
|
||||
else:
|
||||
lines.append(f" {name}: {py_type} = {default!r}")
|
||||
else:
|
||||
lines.append(f" {name}: {py_type}")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
def generate_pydantic() -> dict[str, str]:
|
||||
"""Generate all Pydantic schema files."""
|
||||
files = {}
|
||||
|
||||
# base.py
|
||||
files["base.py"] = "\n".join(
|
||||
[
|
||||
'"""Pydantic Base Schema - GENERATED FILE"""',
|
||||
"",
|
||||
"from pydantic import BaseModel, ConfigDict",
|
||||
"",
|
||||
"",
|
||||
"class BaseSchema(BaseModel):",
|
||||
' """Base schema with ORM mode."""',
|
||||
" model_config = ConfigDict(from_attributes=True)",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
# Schema files per model
|
||||
for cls in DATACLASSES:
|
||||
module_name = cls.__name__.replace("Transcode", "").replace("Media", "").lower()
|
||||
|
||||
lines = [
|
||||
f'"""{cls.__name__} Schemas - GENERATED FILE"""',
|
||||
"",
|
||||
"from datetime import datetime",
|
||||
"from enum import Enum",
|
||||
"from typing import Any, Dict, List, Optional",
|
||||
"from uuid import UUID",
|
||||
"",
|
||||
"from .base import BaseSchema",
|
||||
"",
|
||||
]
|
||||
|
||||
# Add enum if present
|
||||
hints = get_type_hints(cls)
|
||||
for type_hint in hints.values():
|
||||
base, _ = unwrap_optional(type_hint)
|
||||
if isinstance(base, type) and issubclass(base, Enum):
|
||||
lines.extend(
|
||||
[
|
||||
"",
|
||||
f"class {base.__name__}(str, Enum):",
|
||||
]
|
||||
)
|
||||
for m in base:
|
||||
lines.append(f' {m.name} = "{m.value}"')
|
||||
lines.append("")
|
||||
break
|
||||
|
||||
# Schemas
|
||||
for suffix in ["Create", "Update", "Response"]:
|
||||
lines.append("")
|
||||
lines.extend(generate_pydantic_schema(cls, suffix))
|
||||
|
||||
lines.append("")
|
||||
files[f"{module_name}.py"] = "\n".join(lines)
|
||||
|
||||
# __init__.py
|
||||
imports = ["from .base import BaseSchema"]
|
||||
all_exports = ['"BaseSchema"']
|
||||
|
||||
for cls in DATACLASSES:
|
||||
name = cls.__name__.replace("Transcode", "").replace("Media", "")
|
||||
module = name.lower()
|
||||
imports.append(
|
||||
f"from .{module} import {name}Create, {name}Update, {name}Response"
|
||||
)
|
||||
all_exports.extend([f'"{name}Create"', f'"{name}Update"', f'"{name}Response"'])
|
||||
|
||||
# Add enum export
|
||||
hints = get_type_hints(cls)
|
||||
for type_hint in hints.values():
|
||||
base, _ = unwrap_optional(type_hint)
|
||||
if isinstance(base, type) and issubclass(base, Enum):
|
||||
imports.append(f"from .{module} import {base.__name__}")
|
||||
all_exports.append(f'"{base.__name__}"')
|
||||
break
|
||||
|
||||
files["__init__.py"] = "\n".join(
|
||||
[
|
||||
'"""API Schemas - GENERATED FILE"""',
|
||||
"",
|
||||
*imports,
|
||||
"",
|
||||
f"__all__ = [{', '.join(all_exports)}]",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
return files
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# TypeScript Generator
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def resolve_ts_type(type_hint: Any) -> str:
|
||||
"""Resolve Python type to TypeScript type string."""
|
||||
base, optional = unwrap_optional(type_hint)
|
||||
origin = get_origin_name(base)
|
||||
type_name = get_type_name(base)
|
||||
|
||||
# Look up resolver by origin, type name, base type, or enum
|
||||
resolver = (
|
||||
TS_RESOLVERS.get(origin)
|
||||
or TS_RESOLVERS.get(type_name)
|
||||
or TS_RESOLVERS.get(base)
|
||||
or (
|
||||
TS_RESOLVERS["enum"]
|
||||
if isinstance(base, type) and issubclass(base, Enum)
|
||||
else None
|
||||
)
|
||||
)
|
||||
|
||||
result = resolver(base) if resolver else "string"
|
||||
return f"{result} | null" if optional else result
|
||||
|
||||
|
||||
def generate_ts_interface(cls: type) -> list[str]:
|
||||
"""Generate TypeScript interface lines from dataclass."""
|
||||
lines = [f"export interface {cls.__name__} {{"]
|
||||
|
||||
for name, type_hint in get_type_hints(cls).items():
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
ts_type = resolve_ts_type(type_hint)
|
||||
lines.append(f" {name}: {ts_type};")
|
||||
|
||||
lines.append("}")
|
||||
return lines
|
||||
|
||||
|
||||
def generate_typescript() -> str:
|
||||
"""Generate complete TypeScript file."""
|
||||
lines = [
|
||||
"/**",
|
||||
" * MPR TypeScript Types - GENERATED FILE",
|
||||
" *",
|
||||
" * Do not edit directly. Modify schema/models/*.py and run:",
|
||||
" * python schema/generate.py --typescript",
|
||||
" */",
|
||||
"",
|
||||
]
|
||||
|
||||
# Enums as union types
|
||||
for enum in ENUMS:
|
||||
values = " | ".join(f'"{m.value}"' for m in enum)
|
||||
lines.append(f"export type {enum.__name__} = {values};")
|
||||
lines.append("")
|
||||
|
||||
# Interfaces
|
||||
for cls in DATACLASSES:
|
||||
lines.extend(generate_ts_interface(cls))
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Proto Generator
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def resolve_proto_type(type_hint: Any) -> tuple[str, bool]:
|
||||
"""Resolve Python type to proto type. Returns (type, is_optional)."""
|
||||
base, optional = unwrap_optional(type_hint)
|
||||
origin = get_origin_name(base)
|
||||
|
||||
# Look up resolver by origin or base type
|
||||
resolver = PROTO_RESOLVERS.get(origin) or PROTO_RESOLVERS.get(base)
|
||||
|
||||
if resolver:
|
||||
result = resolver(base)
|
||||
is_repeated = result.startswith("repeated")
|
||||
return result, optional and not is_repeated
|
||||
|
||||
return "string", optional
|
||||
|
||||
|
||||
def generate_proto_message(cls: type) -> list[str]:
|
||||
"""Generate proto message lines from dataclass."""
|
||||
lines = [f"message {cls.__name__} {{"]
|
||||
|
||||
hints = get_type_hints(cls)
|
||||
if not hints:
|
||||
lines.append(" // Empty")
|
||||
else:
|
||||
for i, (name, type_hint) in enumerate(hints.items(), 1):
|
||||
proto_type, optional = resolve_proto_type(type_hint)
|
||||
prefix = (
|
||||
"optional "
|
||||
if optional and not proto_type.startswith("repeated")
|
||||
else ""
|
||||
)
|
||||
lines.append(f" {prefix}{proto_type} {name} = {i};")
|
||||
|
||||
lines.append("}")
|
||||
return lines
|
||||
|
||||
|
||||
def generate_proto() -> str:
|
||||
"""Generate complete proto file."""
|
||||
lines = [
|
||||
"// MPR Worker Service - GENERATED FILE",
|
||||
"//",
|
||||
"// Do not edit directly. Modify schema/models/grpc.py and run:",
|
||||
"// python schema/generate.py --proto",
|
||||
"",
|
||||
'syntax = "proto3";',
|
||||
"",
|
||||
f"package {GRPC_SERVICE['package']};",
|
||||
"",
|
||||
f"service {GRPC_SERVICE['name']} {{",
|
||||
]
|
||||
|
||||
# Methods
|
||||
for m in GRPC_SERVICE["methods"]:
|
||||
req = m["request"].__name__
|
||||
resp = m["response"].__name__
|
||||
returns = f"stream {resp}" if m["stream_response"] else resp
|
||||
lines.append(f" rpc {m['name']}({req}) returns ({returns});")
|
||||
|
||||
lines.extend(["}", ""])
|
||||
|
||||
# Messages
|
||||
for cls in GRPC_MESSAGES:
|
||||
lines.extend(generate_proto_message(cls))
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Writers
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def write_file(path: Path, content: str) -> None:
|
||||
"""Write content to file, creating directories as needed."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(content)
|
||||
print(f" {path}")
|
||||
|
||||
|
||||
def write_django(output_dir: Path) -> None:
|
||||
"""Write Django models."""
|
||||
write_file(output_dir / "mpr" / "media_assets" / "models.py", generate_django())
|
||||
|
||||
|
||||
def write_pydantic(output_dir: Path) -> None:
|
||||
"""Write Pydantic schemas."""
|
||||
schemas_dir = output_dir / "api" / "schemas"
|
||||
for filename, content in generate_pydantic().items():
|
||||
write_file(schemas_dir / filename, content)
|
||||
|
||||
|
||||
def write_typescript(output_dir: Path) -> None:
|
||||
"""Write TypeScript types."""
|
||||
write_file(
|
||||
output_dir / "ui" / "timeline" / "src" / "types.ts", generate_typescript()
|
||||
)
|
||||
|
||||
|
||||
def write_proto(output_dir: Path) -> None:
|
||||
"""Write proto and generate stubs."""
|
||||
proto_dir = output_dir / "grpc" / "protos"
|
||||
proto_path = proto_dir / "worker.proto"
|
||||
write_file(proto_path, generate_proto())
|
||||
|
||||
# Generate Python stubs
|
||||
grpc_dir = output_dir / "grpc"
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-m",
|
||||
"grpc_tools.protoc",
|
||||
f"-I{proto_dir}",
|
||||
f"--python_out={grpc_dir}",
|
||||
f"--grpc_python_out={grpc_dir}",
|
||||
str(proto_path),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
print(f" {grpc_dir}/worker_pb2.py")
|
||||
print(f" {grpc_dir}/worker_pb2_grpc.py")
|
||||
else:
|
||||
print(" Warning: grpc_tools failed - pip install grpcio-tools")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Main
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Generate from schema")
|
||||
parser.add_argument("--django", action="store_true")
|
||||
parser.add_argument("--pydantic", action="store_true")
|
||||
parser.add_argument("--typescript", action="store_true")
|
||||
parser.add_argument("--proto", action="store_true")
|
||||
parser.add_argument("--all", action="store_true")
|
||||
parser.add_argument("--output", type=Path, default=PROJECT_ROOT)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not any([args.django, args.pydantic, args.typescript, args.proto, args.all]):
|
||||
args.all = True
|
||||
|
||||
print(f"Generating to {args.output}\n")
|
||||
|
||||
targets: list[tuple[bool, str, Callable]] = [
|
||||
(args.django or args.all, "Django", write_django),
|
||||
(args.pydantic or args.all, "Pydantic", write_pydantic),
|
||||
(args.typescript or args.all, "TypeScript", write_typescript),
|
||||
(args.proto or args.all, "Proto", write_proto),
|
||||
]
|
||||
|
||||
for enabled, name, writer in targets:
|
||||
if enabled:
|
||||
print(f"{name}:")
|
||||
writer(args.output)
|
||||
print()
|
||||
|
||||
print("Done!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user