fixes and modelgen insert
This commit is contained in:
37
tools/modelgen/loader/__init__.py
Normal file
37
tools/modelgen/loader/__init__.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Loader - Input source handlers for modelgen.
|
||||
|
||||
Supported loaders:
|
||||
- ConfigLoader: Load from soleprint config.json
|
||||
- SchemaLoader: Load from Python dataclasses in schema/ folder
|
||||
- Extractors: Extract from existing codebases (Django, SQLAlchemy, Prisma)
|
||||
"""
|
||||
|
||||
from .config import ConfigLoader, load_config
|
||||
from .extract import EXTRACTORS, BaseExtractor, DjangoExtractor
|
||||
from .schema import (
|
||||
EnumDefinition,
|
||||
FieldDefinition,
|
||||
GrpcServiceDefinition,
|
||||
ModelDefinition,
|
||||
SchemaLoader,
|
||||
load_schema,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Config loader
|
||||
"ConfigLoader",
|
||||
"load_config",
|
||||
# Schema loader
|
||||
"SchemaLoader",
|
||||
"load_schema",
|
||||
# Model definitions
|
||||
"ModelDefinition",
|
||||
"FieldDefinition",
|
||||
"EnumDefinition",
|
||||
"GrpcServiceDefinition",
|
||||
# Extractors
|
||||
"BaseExtractor",
|
||||
"DjangoExtractor",
|
||||
"EXTRACTORS",
|
||||
]
|
||||
116
tools/modelgen/loader/config.py
Normal file
116
tools/modelgen/loader/config.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""
|
||||
Configuration Loader
|
||||
|
||||
Loads and validates framework configuration files (soleprint config.json style).
|
||||
"""
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameworkConfig:
|
||||
"""Framework metadata"""
|
||||
|
||||
name: str
|
||||
slug: str
|
||||
version: str
|
||||
description: str
|
||||
tagline: str
|
||||
icon: str
|
||||
hub_port: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemConfig:
|
||||
"""System configuration"""
|
||||
|
||||
key: str
|
||||
name: str
|
||||
slug: str = ""
|
||||
title: str = ""
|
||||
tagline: str = ""
|
||||
icon: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ComponentConfig:
|
||||
"""Component configuration"""
|
||||
|
||||
name: str
|
||||
title: str
|
||||
description: str
|
||||
plural: Optional[str] = None
|
||||
formula: Optional[str] = None
|
||||
|
||||
|
||||
class ConfigLoader:
|
||||
"""Loads and parses framework configuration"""
|
||||
|
||||
def __init__(self, config_path: Path):
|
||||
self.config_path = Path(config_path)
|
||||
self.raw_config: Dict[str, Any] = {}
|
||||
self.framework: Optional[FrameworkConfig] = None
|
||||
self.systems: List[SystemConfig] = []
|
||||
self.components: Dict[str, Dict[str, ComponentConfig]] = {}
|
||||
|
||||
def load(self) -> "ConfigLoader":
|
||||
"""Load configuration from file"""
|
||||
with open(self.config_path) as f:
|
||||
self.raw_config = json.load(f)
|
||||
|
||||
self._parse_framework()
|
||||
self._parse_systems()
|
||||
self._parse_components()
|
||||
|
||||
return self
|
||||
|
||||
def _parse_framework(self):
|
||||
"""Parse framework metadata"""
|
||||
fw = self.raw_config["framework"]
|
||||
self.framework = FrameworkConfig(**fw)
|
||||
|
||||
def _parse_systems(self):
|
||||
"""Parse system configurations"""
|
||||
for sys in self.raw_config["systems"]:
|
||||
self.systems.append(SystemConfig(**sys))
|
||||
|
||||
def _parse_components(self):
|
||||
"""Parse component configurations"""
|
||||
comps = self.raw_config["components"]
|
||||
|
||||
# Shared components
|
||||
self.components["shared"] = {}
|
||||
for key, value in comps.get("shared", {}).items():
|
||||
self.components["shared"][key] = ComponentConfig(**value)
|
||||
|
||||
# System-specific components
|
||||
for system_key in ["data_flow", "documentation", "execution"]:
|
||||
self.components[system_key] = {}
|
||||
for comp_key, comp_value in comps.get(system_key, {}).items():
|
||||
self.components[system_key][comp_key] = ComponentConfig(**comp_value)
|
||||
|
||||
def get_system(self, key: str) -> Optional[SystemConfig]:
|
||||
"""Get system config by key"""
|
||||
for sys in self.systems:
|
||||
if sys.key == key:
|
||||
return sys
|
||||
return None
|
||||
|
||||
def get_component(
|
||||
self, system_key: str, component_key: str
|
||||
) -> Optional[ComponentConfig]:
|
||||
"""Get component config"""
|
||||
return self.components.get(system_key, {}).get(component_key)
|
||||
|
||||
def get_shared_component(self, key: str) -> Optional[ComponentConfig]:
|
||||
"""Get shared component config"""
|
||||
return self.components.get("shared", {}).get(key)
|
||||
|
||||
|
||||
def load_config(config_path: str | Path) -> ConfigLoader:
|
||||
"""Load and validate configuration file"""
|
||||
loader = ConfigLoader(config_path)
|
||||
return loader.load()
|
||||
20
tools/modelgen/loader/extract/__init__.py
Normal file
20
tools/modelgen/loader/extract/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
Extractors - Extract model definitions from existing codebases.
|
||||
|
||||
Supported frameworks:
|
||||
- Django: Extract from Django ORM models
|
||||
- SQLAlchemy: Extract from SQLAlchemy models (planned)
|
||||
- Prisma: Extract from Prisma schema (planned)
|
||||
"""
|
||||
|
||||
from typing import Dict, Type
|
||||
|
||||
from .base import BaseExtractor
|
||||
from .django import DjangoExtractor
|
||||
|
||||
# Registry of available extractors
|
||||
EXTRACTORS: Dict[str, Type[BaseExtractor]] = {
|
||||
"django": DjangoExtractor,
|
||||
}
|
||||
|
||||
__all__ = ["BaseExtractor", "DjangoExtractor", "EXTRACTORS"]
|
||||
38
tools/modelgen/loader/extract/base.py
Normal file
38
tools/modelgen/loader/extract/base.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""
|
||||
Base Extractor
|
||||
|
||||
Abstract base class for model extractors.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from ..schema import EnumDefinition, ModelDefinition
|
||||
|
||||
|
||||
class BaseExtractor(ABC):
|
||||
"""Abstract base for codebase model extractors."""
|
||||
|
||||
def __init__(self, source_path: Path):
|
||||
self.source_path = Path(source_path)
|
||||
|
||||
@abstractmethod
|
||||
def extract(self) -> tuple[List[ModelDefinition], List[EnumDefinition]]:
|
||||
"""
|
||||
Extract model definitions from source codebase.
|
||||
|
||||
Returns:
|
||||
Tuple of (models, enums)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def detect(self) -> bool:
|
||||
"""
|
||||
Detect if this extractor can handle the source path.
|
||||
|
||||
Returns:
|
||||
True if this extractor can handle the source
|
||||
"""
|
||||
pass
|
||||
237
tools/modelgen/loader/extract/django.py
Normal file
237
tools/modelgen/loader/extract/django.py
Normal file
@@ -0,0 +1,237 @@
|
||||
"""
|
||||
Django Extractor
|
||||
|
||||
Extracts model definitions from Django ORM models.
|
||||
"""
|
||||
|
||||
import ast
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from ..schema import EnumDefinition, FieldDefinition, ModelDefinition
|
||||
from .base import BaseExtractor
|
||||
|
||||
# Django field type mappings to Python types
|
||||
DJANGO_FIELD_TYPES = {
|
||||
"CharField": str,
|
||||
"TextField": str,
|
||||
"EmailField": str,
|
||||
"URLField": str,
|
||||
"SlugField": str,
|
||||
"UUIDField": "UUID",
|
||||
"IntegerField": int,
|
||||
"BigIntegerField": "bigint",
|
||||
"SmallIntegerField": int,
|
||||
"PositiveIntegerField": int,
|
||||
"FloatField": float,
|
||||
"DecimalField": float,
|
||||
"BooleanField": bool,
|
||||
"NullBooleanField": bool,
|
||||
"DateField": "datetime",
|
||||
"DateTimeField": "datetime",
|
||||
"TimeField": "datetime",
|
||||
"JSONField": "dict",
|
||||
"ForeignKey": "FK",
|
||||
"OneToOneField": "FK",
|
||||
"ManyToManyField": "M2M",
|
||||
}
|
||||
|
||||
|
||||
class DjangoExtractor(BaseExtractor):
|
||||
"""Extracts models from Django ORM."""
|
||||
|
||||
def detect(self) -> bool:
|
||||
"""Check if this is a Django project."""
|
||||
# Look for manage.py or settings.py
|
||||
manage_py = self.source_path / "manage.py"
|
||||
settings_py = self.source_path / "settings.py"
|
||||
|
||||
if manage_py.exists():
|
||||
return True
|
||||
|
||||
# Check for Django imports in any models.py
|
||||
for models_file in self.source_path.rglob("models.py"):
|
||||
content = models_file.read_text()
|
||||
if "from django.db import models" in content:
|
||||
return True
|
||||
|
||||
return settings_py.exists()
|
||||
|
||||
def extract(self) -> tuple[List[ModelDefinition], List[EnumDefinition]]:
|
||||
"""Extract Django models using AST parsing."""
|
||||
models = []
|
||||
enums = []
|
||||
|
||||
# Find all models.py files
|
||||
for models_file in self.source_path.rglob("models.py"):
|
||||
file_models, file_enums = self._extract_from_file(models_file)
|
||||
models.extend(file_models)
|
||||
enums.extend(file_enums)
|
||||
|
||||
return models, enums
|
||||
|
||||
def _extract_from_file(
|
||||
self, file_path: Path
|
||||
) -> tuple[List[ModelDefinition], List[EnumDefinition]]:
|
||||
"""Extract models from a single models.py file."""
|
||||
models = []
|
||||
enums = []
|
||||
|
||||
content = file_path.read_text()
|
||||
tree = ast.parse(content)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
# Check if it inherits from models.Model
|
||||
if self._is_django_model(node):
|
||||
model_def = self._parse_model_class(node)
|
||||
if model_def:
|
||||
models.append(model_def)
|
||||
# Check if it's a TextChoices/IntegerChoices enum
|
||||
elif self._is_django_choices(node):
|
||||
enum_def = self._parse_choices_class(node)
|
||||
if enum_def:
|
||||
enums.append(enum_def)
|
||||
|
||||
return models, enums
|
||||
|
||||
def _is_django_model(self, node: ast.ClassDef) -> bool:
|
||||
"""Check if class inherits from models.Model."""
|
||||
for base in node.bases:
|
||||
if isinstance(base, ast.Attribute):
|
||||
if base.attr == "Model":
|
||||
return True
|
||||
elif isinstance(base, ast.Name):
|
||||
if base.id in ("Model", "AbstractUser", "AbstractBaseUser"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_django_choices(self, node: ast.ClassDef) -> bool:
|
||||
"""Check if class is a Django TextChoices/IntegerChoices."""
|
||||
for base in node.bases:
|
||||
if isinstance(base, ast.Attribute):
|
||||
if base.attr in ("TextChoices", "IntegerChoices"):
|
||||
return True
|
||||
elif isinstance(base, ast.Name):
|
||||
if base.id in ("TextChoices", "IntegerChoices"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _parse_model_class(self, node: ast.ClassDef) -> Optional[ModelDefinition]:
|
||||
"""Parse a Django model class into ModelDefinition."""
|
||||
fields = []
|
||||
|
||||
for item in node.body:
|
||||
if isinstance(item, ast.Assign):
|
||||
field_def = self._parse_field_assignment(item)
|
||||
if field_def:
|
||||
fields.append(field_def)
|
||||
elif isinstance(item, ast.AnnAssign):
|
||||
# Handle annotated assignments (Django 4.0+ style)
|
||||
field_def = self._parse_annotated_field(item)
|
||||
if field_def:
|
||||
fields.append(field_def)
|
||||
|
||||
# Get docstring
|
||||
docstring = ast.get_docstring(node)
|
||||
|
||||
return ModelDefinition(
|
||||
name=node.name,
|
||||
fields=fields,
|
||||
docstring=docstring,
|
||||
)
|
||||
|
||||
def _parse_field_assignment(self, node: ast.Assign) -> Optional[FieldDefinition]:
|
||||
"""Parse a field assignment like: name = models.CharField(...)"""
|
||||
if not node.targets or not isinstance(node.targets[0], ast.Name):
|
||||
return None
|
||||
|
||||
field_name = node.targets[0].id
|
||||
|
||||
# Skip private fields and Meta class
|
||||
if field_name.startswith("_") or field_name == "Meta":
|
||||
return None
|
||||
|
||||
# Parse the field call
|
||||
if isinstance(node.value, ast.Call):
|
||||
return self._parse_field_call(field_name, node.value)
|
||||
|
||||
return None
|
||||
|
||||
def _parse_annotated_field(self, node: ast.AnnAssign) -> Optional[FieldDefinition]:
|
||||
"""Parse an annotated field assignment."""
|
||||
if not isinstance(node.target, ast.Name):
|
||||
return None
|
||||
|
||||
field_name = node.target.id
|
||||
|
||||
if field_name.startswith("_"):
|
||||
return None
|
||||
|
||||
if node.value and isinstance(node.value, ast.Call):
|
||||
return self._parse_field_call(field_name, node.value)
|
||||
|
||||
return None
|
||||
|
||||
def _parse_field_call(
|
||||
self, field_name: str, call: ast.Call
|
||||
) -> Optional[FieldDefinition]:
|
||||
"""Parse a Django field call like models.CharField(max_length=100)."""
|
||||
# Get field type name
|
||||
field_type_name = None
|
||||
|
||||
if isinstance(call.func, ast.Attribute):
|
||||
field_type_name = call.func.attr
|
||||
elif isinstance(call.func, ast.Name):
|
||||
field_type_name = call.func.id
|
||||
|
||||
if not field_type_name:
|
||||
return None
|
||||
|
||||
# Map to Python type
|
||||
python_type = DJANGO_FIELD_TYPES.get(field_type_name, str)
|
||||
|
||||
# Check for null=True
|
||||
optional = False
|
||||
default = None
|
||||
|
||||
for keyword in call.keywords:
|
||||
if keyword.arg == "null":
|
||||
if isinstance(keyword.value, ast.Constant):
|
||||
optional = keyword.value.value is True
|
||||
elif keyword.arg == "default":
|
||||
if isinstance(keyword.value, ast.Constant):
|
||||
default = keyword.value.value
|
||||
|
||||
return FieldDefinition(
|
||||
name=field_name,
|
||||
type_hint=python_type,
|
||||
default=default if default is not None else None,
|
||||
optional=optional,
|
||||
)
|
||||
|
||||
def _parse_choices_class(self, node: ast.ClassDef) -> Optional[EnumDefinition]:
|
||||
"""Parse a Django TextChoices/IntegerChoices class."""
|
||||
values = []
|
||||
|
||||
for item in node.body:
|
||||
if isinstance(item, ast.Assign):
|
||||
if item.targets and isinstance(item.targets[0], ast.Name):
|
||||
name = item.targets[0].id
|
||||
if name.isupper(): # Enum values are typically uppercase
|
||||
# Get the value
|
||||
value = name.lower() # Default to lowercase name
|
||||
if isinstance(item.value, ast.Constant):
|
||||
value = str(item.value.value)
|
||||
elif isinstance(item.value, ast.Tuple) and item.value.elts:
|
||||
# TextChoices: NAME = "value", "Label"
|
||||
if isinstance(item.value.elts[0], ast.Constant):
|
||||
value = str(item.value.elts[0].value)
|
||||
|
||||
values.append((name, value))
|
||||
|
||||
if not values:
|
||||
return None
|
||||
|
||||
return EnumDefinition(name=node.name, values=values)
|
||||
169
tools/modelgen/loader/schema.py
Normal file
169
tools/modelgen/loader/schema.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
Schema Loader
|
||||
|
||||
Loads Python dataclasses from a schema/ folder.
|
||||
Expects the folder to have an __init__.py that exports:
|
||||
- DATACLASSES: List of dataclass types to generate
|
||||
- ENUMS: List of Enum types to include
|
||||
- GRPC_MESSAGES: (optional) List of gRPC message types
|
||||
- GRPC_SERVICE: (optional) gRPC service definition dict
|
||||
"""
|
||||
|
||||
import dataclasses as dc
|
||||
import importlib.util
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Type, get_type_hints
|
||||
|
||||
|
||||
@dataclass
|
||||
class FieldDefinition:
|
||||
"""Represents a model field."""
|
||||
|
||||
name: str
|
||||
type_hint: Any
|
||||
default: Any = dc.MISSING
|
||||
optional: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelDefinition:
|
||||
"""Represents a model/dataclass."""
|
||||
|
||||
name: str
|
||||
fields: List[FieldDefinition]
|
||||
docstring: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnumDefinition:
|
||||
"""Represents an enum."""
|
||||
|
||||
name: str
|
||||
values: List[tuple[str, str]] # (name, value) pairs
|
||||
|
||||
|
||||
@dataclass
|
||||
class GrpcServiceDefinition:
|
||||
"""Represents a gRPC service."""
|
||||
|
||||
package: str
|
||||
name: str
|
||||
methods: List[Dict[str, Any]]
|
||||
|
||||
|
||||
class SchemaLoader:
|
||||
"""Loads model definitions from Python dataclasses in schema/ folder."""
|
||||
|
||||
def __init__(self, schema_path: Path):
|
||||
self.schema_path = Path(schema_path)
|
||||
self.models: List[ModelDefinition] = []
|
||||
self.enums: List[EnumDefinition] = []
|
||||
self.grpc_messages: List[ModelDefinition] = []
|
||||
self.grpc_service: Optional[GrpcServiceDefinition] = None
|
||||
|
||||
def load(self) -> "SchemaLoader":
|
||||
"""Load schema definitions from the schema folder."""
|
||||
init_path = self.schema_path / "__init__.py"
|
||||
|
||||
if not init_path.exists():
|
||||
raise FileNotFoundError(f"Schema folder must have __init__.py: {init_path}")
|
||||
|
||||
# Import the schema module
|
||||
module = self._import_module(init_path)
|
||||
|
||||
# Extract DATACLASSES
|
||||
dataclasses = getattr(module, "DATACLASSES", [])
|
||||
for cls in dataclasses:
|
||||
self.models.append(self._parse_dataclass(cls))
|
||||
|
||||
# Extract ENUMS
|
||||
enums = getattr(module, "ENUMS", [])
|
||||
for enum_cls in enums:
|
||||
self.enums.append(self._parse_enum(enum_cls))
|
||||
|
||||
# Extract GRPC_MESSAGES (optional)
|
||||
grpc_messages = getattr(module, "GRPC_MESSAGES", [])
|
||||
for cls in grpc_messages:
|
||||
self.grpc_messages.append(self._parse_dataclass(cls))
|
||||
|
||||
# Extract GRPC_SERVICE (optional)
|
||||
grpc_service = getattr(module, "GRPC_SERVICE", None)
|
||||
if grpc_service:
|
||||
self.grpc_service = GrpcServiceDefinition(
|
||||
package=grpc_service.get("package", "service"),
|
||||
name=grpc_service.get("name", "Service"),
|
||||
methods=grpc_service.get("methods", []),
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
def _import_module(self, path: Path):
|
||||
"""Import a Python module from a file path."""
|
||||
spec = importlib.util.spec_from_file_location("schema", path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise ImportError(f"Could not load module from {path}")
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules["schema"] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
def _parse_dataclass(self, cls: Type) -> ModelDefinition:
|
||||
"""Parse a dataclass into a ModelDefinition."""
|
||||
hints = get_type_hints(cls)
|
||||
fields_info = {f.name: f for f in dc.fields(cls)}
|
||||
|
||||
fields = []
|
||||
for name, type_hint in hints.items():
|
||||
if name.startswith("_"):
|
||||
continue
|
||||
|
||||
field_info = fields_info.get(name)
|
||||
default = dc.MISSING
|
||||
if field_info:
|
||||
if field_info.default is not dc.MISSING:
|
||||
default = field_info.default
|
||||
elif field_info.default_factory is not dc.MISSING:
|
||||
default = field_info.default_factory
|
||||
|
||||
# Check if optional (Union with None)
|
||||
optional = self._is_optional(type_hint)
|
||||
|
||||
fields.append(
|
||||
FieldDefinition(
|
||||
name=name,
|
||||
type_hint=type_hint,
|
||||
default=default,
|
||||
optional=optional,
|
||||
)
|
||||
)
|
||||
|
||||
return ModelDefinition(
|
||||
name=cls.__name__,
|
||||
fields=fields,
|
||||
docstring=cls.__doc__,
|
||||
)
|
||||
|
||||
def _parse_enum(self, enum_cls: Type[Enum]) -> EnumDefinition:
|
||||
"""Parse an Enum into an EnumDefinition."""
|
||||
values = [(m.name, m.value) for m in enum_cls]
|
||||
return EnumDefinition(name=enum_cls.__name__, values=values)
|
||||
|
||||
def _is_optional(self, type_hint: Any) -> bool:
|
||||
"""Check if a type hint is Optional (Union with None)."""
|
||||
from typing import Union, get_args, get_origin
|
||||
|
||||
origin = get_origin(type_hint)
|
||||
if origin is Union:
|
||||
args = get_args(type_hint)
|
||||
return type(None) in args
|
||||
return False
|
||||
|
||||
|
||||
def load_schema(schema_path: str | Path) -> SchemaLoader:
|
||||
"""Load schema definitions from folder."""
|
||||
loader = SchemaLoader(schema_path)
|
||||
return loader.load()
|
||||
Reference in New Issue
Block a user