remove duplicated code
This commit is contained in:
427
modelgen/generator/pydantic.py
Normal file
427
modelgen/generator/pydantic.py
Normal file
@@ -0,0 +1,427 @@
|
||||
"""
|
||||
Pydantic Generator
|
||||
|
||||
Generates Pydantic BaseModel classes from model definitions.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, List, get_type_hints
|
||||
|
||||
from ..helpers import get_origin_name, get_type_name, unwrap_optional
|
||||
from ..loader.schema import EnumDefinition, FieldDefinition, ModelDefinition
|
||||
from ..types import PYDANTIC_RESOLVERS
|
||||
from .base import BaseGenerator
|
||||
|
||||
|
||||
class PydanticGenerator(BaseGenerator):
|
||||
"""Generates Pydantic model files."""
|
||||
|
||||
def file_extension(self) -> str:
|
||||
return ".py"
|
||||
|
||||
def generate(self, models, output_path: Path) -> None:
|
||||
"""Generate Pydantic models to output_path."""
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Detect input type and generate accordingly
|
||||
if hasattr(models, "get_shared_component"):
|
||||
# ConfigLoader (soleprint config)
|
||||
content = self._generate_from_config(models)
|
||||
elif hasattr(models, "models"):
|
||||
# SchemaLoader
|
||||
content = self._generate_from_definitions(
|
||||
models.models, getattr(models, "enums", [])
|
||||
)
|
||||
elif isinstance(models, tuple):
|
||||
# (models, enums) tuple from extractor
|
||||
content = self._generate_from_definitions(models[0], models[1])
|
||||
elif isinstance(models, list):
|
||||
# List of dataclasses (MPR style)
|
||||
content = self._generate_from_dataclasses(models)
|
||||
else:
|
||||
raise ValueError(f"Unsupported input type: {type(models)}")
|
||||
|
||||
output_path.write_text(content)
|
||||
|
||||
def _generate_from_definitions(
|
||||
self, models: List[ModelDefinition], enums: List[EnumDefinition]
|
||||
) -> str:
|
||||
"""Generate from ModelDefinition objects (schema/extract mode)."""
|
||||
lines = self._generate_header()
|
||||
|
||||
# Generate enums
|
||||
for enum_def in enums:
|
||||
lines.extend(self._generate_enum(enum_def))
|
||||
lines.append("")
|
||||
|
||||
# Generate models
|
||||
for model_def in models:
|
||||
lines.extend(self._generate_model_from_definition(model_def))
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _generate_from_dataclasses(self, dataclasses: List[type]) -> str:
|
||||
"""Generate from Python dataclasses (MPR style)."""
|
||||
lines = self._generate_header()
|
||||
|
||||
# Collect and generate enums first
|
||||
enums_generated = set()
|
||||
for cls in dataclasses:
|
||||
hints = get_type_hints(cls)
|
||||
for type_hint in hints.values():
|
||||
base, _ = unwrap_optional(type_hint)
|
||||
if isinstance(base, type) and issubclass(base, Enum):
|
||||
if base.__name__ not in enums_generated:
|
||||
lines.extend(self._generate_enum_from_python(base))
|
||||
lines.append("")
|
||||
enums_generated.add(base.__name__)
|
||||
|
||||
# Generate models
|
||||
for cls in dataclasses:
|
||||
lines.extend(self._generate_model_from_dataclass(cls))
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _generate_header(self) -> List[str]:
|
||||
"""Generate file header."""
|
||||
return [
|
||||
'"""',
|
||||
"Pydantic Models - GENERATED FILE",
|
||||
"",
|
||||
"Do not edit directly. Regenerate using modelgen.",
|
||||
'"""',
|
||||
"",
|
||||
"from datetime import datetime",
|
||||
"from enum import Enum",
|
||||
"from typing import Any, Dict, List, Optional",
|
||||
"from uuid import UUID",
|
||||
"",
|
||||
"from pydantic import BaseModel, Field",
|
||||
"",
|
||||
]
|
||||
|
||||
def _generate_enum(self, enum_def: EnumDefinition) -> List[str]:
|
||||
"""Generate Pydantic enum from EnumDefinition."""
|
||||
lines = [f"class {enum_def.name}(str, Enum):"]
|
||||
for name, value in enum_def.values:
|
||||
lines.append(f' {name} = "{value}"')
|
||||
return lines
|
||||
|
||||
def _generate_enum_from_python(self, enum_cls: type) -> List[str]:
|
||||
"""Generate Pydantic enum from Python Enum."""
|
||||
lines = [f"class {enum_cls.__name__}(str, Enum):"]
|
||||
for member in enum_cls:
|
||||
lines.append(f' {member.name} = "{member.value}"')
|
||||
return lines
|
||||
|
||||
def _generate_model_from_definition(self, model_def: ModelDefinition) -> List[str]:
|
||||
"""Generate Pydantic model from ModelDefinition."""
|
||||
docstring = model_def.docstring or model_def.name
|
||||
lines = [
|
||||
f"class {model_def.name}(BaseModel):",
|
||||
f' """{docstring.strip().split(chr(10))[0]}"""',
|
||||
]
|
||||
|
||||
if not model_def.fields:
|
||||
lines.append(" pass")
|
||||
else:
|
||||
for field in model_def.fields:
|
||||
py_type = self._resolve_type(field.type_hint, field.optional)
|
||||
default = self._format_default(field.default, field.optional)
|
||||
lines.append(f" {field.name}: {py_type}{default}")
|
||||
|
||||
return lines
|
||||
|
||||
def _generate_model_from_dataclass(self, cls: type) -> List[str]:
|
||||
"""Generate Pydantic model from a dataclass."""
|
||||
import dataclasses as dc
|
||||
|
||||
docstring = cls.__doc__ or cls.__name__
|
||||
lines = [
|
||||
f"class {cls.__name__}(BaseModel):",
|
||||
f' """{docstring.strip().split(chr(10))[0]}"""',
|
||||
]
|
||||
|
||||
hints = get_type_hints(cls)
|
||||
fields = {f.name: f for f in dc.fields(cls)}
|
||||
|
||||
for name, type_hint in hints.items():
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
|
||||
field = fields.get(name)
|
||||
default_val = dc.MISSING
|
||||
if field:
|
||||
if field.default is not dc.MISSING:
|
||||
default_val = field.default
|
||||
|
||||
py_type = self._resolve_type(type_hint, False)
|
||||
default = self._format_default(default_val, "Optional" in py_type)
|
||||
lines.append(f" {name}: {py_type}{default}")
|
||||
|
||||
return lines
|
||||
|
||||
def _resolve_type(self, type_hint: Any, optional: bool) -> str:
|
||||
"""Resolve Python type to Pydantic type string."""
|
||||
base, is_optional = unwrap_optional(type_hint)
|
||||
optional = optional or is_optional
|
||||
origin = get_origin_name(base)
|
||||
type_name = get_type_name(base)
|
||||
|
||||
# Look up resolver
|
||||
resolver = (
|
||||
PYDANTIC_RESOLVERS.get(origin)
|
||||
or PYDANTIC_RESOLVERS.get(type_name)
|
||||
or PYDANTIC_RESOLVERS.get(base)
|
||||
or (
|
||||
PYDANTIC_RESOLVERS["enum"]
|
||||
if isinstance(base, type) and issubclass(base, Enum)
|
||||
else None
|
||||
)
|
||||
)
|
||||
|
||||
result = resolver(base) if resolver else "str"
|
||||
return f"Optional[{result}]" if optional else result
|
||||
|
||||
def _format_default(self, default: Any, optional: bool) -> str:
|
||||
"""Format default value for field."""
|
||||
import dataclasses as dc
|
||||
|
||||
if optional:
|
||||
return " = None"
|
||||
if default is dc.MISSING or default is None:
|
||||
return ""
|
||||
if isinstance(default, str):
|
||||
return f' = "{default}"'
|
||||
if isinstance(default, Enum):
|
||||
return f" = {default.__class__.__name__}.{default.name}"
|
||||
if callable(default):
|
||||
return " = Field(default_factory=list)" if "list" in str(default) else ""
|
||||
return f" = {default!r}"
|
||||
|
||||
def _generate_from_config(self, config) -> str:
|
||||
"""Generate from ConfigLoader (soleprint config.json mode)."""
|
||||
# Get component names from config
|
||||
config_comp = config.get_shared_component("config")
|
||||
data_comp = config.get_shared_component("data")
|
||||
|
||||
data_flow_sys = config.get_system("data_flow")
|
||||
doc_sys = config.get_system("documentation")
|
||||
exec_sys = config.get_system("execution")
|
||||
|
||||
connector_comp = config.get_component("data_flow", "connector")
|
||||
pulse_comp = config.get_component("data_flow", "composed")
|
||||
|
||||
pattern_comp = config.get_component("documentation", "pattern")
|
||||
doc_composed = config.get_component("documentation", "composed")
|
||||
|
||||
tool_comp = config.get_component("execution", "utility")
|
||||
monitor_comp = config.get_component("execution", "watcher")
|
||||
cabinet_comp = config.get_component("execution", "container")
|
||||
exec_composed = config.get_component("execution", "composed")
|
||||
|
||||
return f'''"""
|
||||
Pydantic models - Generated from {config.framework.name}.config.json
|
||||
|
||||
DO NOT EDIT MANUALLY - Regenerate from config
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class Status(str, Enum):
|
||||
PENDING = "pending"
|
||||
PLANNED = "planned"
|
||||
BUILDING = "building"
|
||||
DEV = "dev"
|
||||
LIVE = "live"
|
||||
READY = "ready"
|
||||
|
||||
|
||||
class System(str, Enum):
|
||||
{data_flow_sys.name.upper()} = "{data_flow_sys.name}"
|
||||
{doc_sys.name.upper()} = "{doc_sys.name}"
|
||||
{exec_sys.name.upper()} = "{exec_sys.name}"
|
||||
|
||||
|
||||
class ToolType(str, Enum):
|
||||
APP = "app"
|
||||
CLI = "cli"
|
||||
|
||||
|
||||
# === Shared Components ===
|
||||
|
||||
|
||||
class {config_comp.title}(BaseModel):
|
||||
"""{config_comp.description}. Shared across {data_flow_sys.name}, {exec_sys.name}."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
config_path: Optional[str] = None
|
||||
|
||||
|
||||
class {data_comp.title}(BaseModel):
|
||||
"""{data_comp.description}. Shared across all systems."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
source_template: Optional[str] = None
|
||||
data_path: Optional[str] = None
|
||||
|
||||
|
||||
# === System-Specific Components ===
|
||||
|
||||
|
||||
class {connector_comp.title}(BaseModel):
|
||||
"""{connector_comp.description} ({data_flow_sys.name})."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
system: Literal["{data_flow_sys.name}"] = "{data_flow_sys.name}"
|
||||
mock: Optional[bool] = None
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class {pattern_comp.title}(BaseModel):
|
||||
"""{pattern_comp.description} ({doc_sys.name})."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
template_path: Optional[str] = None
|
||||
system: Literal["{doc_sys.name}"] = "{doc_sys.name}"
|
||||
|
||||
|
||||
class {tool_comp.title}(BaseModel):
|
||||
"""{tool_comp.description} ({exec_sys.name})."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
system: Literal["{exec_sys.name}"] = "{exec_sys.name}"
|
||||
type: Optional[ToolType] = None
|
||||
description: Optional[str] = None
|
||||
path: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
cli: Optional[str] = None
|
||||
|
||||
|
||||
class {monitor_comp.title}(BaseModel):
|
||||
"""{monitor_comp.description} ({exec_sys.name})."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
system: Literal["{exec_sys.name}"] = "{exec_sys.name}"
|
||||
|
||||
|
||||
class {cabinet_comp.title}(BaseModel):
|
||||
"""{cabinet_comp.description} ({exec_sys.name})."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
tools: List[{tool_comp.title}] = Field(default_factory=list)
|
||||
system: Literal["{exec_sys.name}"] = "{exec_sys.name}"
|
||||
|
||||
|
||||
# === Composed Types ===
|
||||
|
||||
|
||||
class {pulse_comp.title}(BaseModel):
|
||||
"""{pulse_comp.description} ({data_flow_sys.name}). Formula: {pulse_comp.formula}."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
{connector_comp.name}: Optional[{connector_comp.title}] = None
|
||||
{config_comp.name}: Optional[{config_comp.title}] = None
|
||||
{data_comp.name}: Optional[{data_comp.title}] = None
|
||||
system: Literal["{data_flow_sys.name}"] = "{data_flow_sys.name}"
|
||||
|
||||
|
||||
class {doc_composed.title}(BaseModel):
|
||||
"""{doc_composed.description} ({doc_sys.name}). Formula: {doc_composed.formula}."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
template: Optional[{pattern_comp.title}] = None
|
||||
{data_comp.name}: Optional[{data_comp.title}] = None
|
||||
output_{data_comp.name}: Optional[{data_comp.title}] = None
|
||||
system: Literal["{doc_sys.name}"] = "{doc_sys.name}"
|
||||
|
||||
|
||||
class {exec_composed.title}(BaseModel):
|
||||
"""{exec_composed.description} ({exec_sys.name}). Formula: {exec_composed.formula}."""
|
||||
|
||||
name: str # Unique identifier
|
||||
slug: str # URL-friendly identifier
|
||||
title: str # Display title for UI
|
||||
status: Optional[Status] = None
|
||||
cabinet: Optional[{cabinet_comp.title}] = None
|
||||
{config_comp.name}: Optional[{config_comp.title}] = None
|
||||
{data_comp.plural}: List[{data_comp.title}] = Field(default_factory=list)
|
||||
system: Literal["{exec_sys.name}"] = "{exec_sys.name}"
|
||||
|
||||
|
||||
# === Collection wrappers for JSON files ===
|
||||
|
||||
|
||||
class {config_comp.title}Collection(BaseModel):
|
||||
items: List[{config_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {data_comp.title}Collection(BaseModel):
|
||||
items: List[{data_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {connector_comp.title}Collection(BaseModel):
|
||||
items: List[{connector_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {pattern_comp.title}Collection(BaseModel):
|
||||
items: List[{pattern_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {tool_comp.title}Collection(BaseModel):
|
||||
items: List[{tool_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {monitor_comp.title}Collection(BaseModel):
|
||||
items: List[{monitor_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {cabinet_comp.title}Collection(BaseModel):
|
||||
items: List[{cabinet_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {pulse_comp.title}Collection(BaseModel):
|
||||
items: List[{pulse_comp.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {doc_composed.title}Collection(BaseModel):
|
||||
items: List[{doc_composed.title}] = Field(default_factory=list)
|
||||
|
||||
|
||||
class {exec_composed.title}Collection(BaseModel):
|
||||
items: List[{exec_composed.title}] = Field(default_factory=list)
|
||||
'''
|
||||
Reference in New Issue
Block a user