Files
soleprint/cfg/amar/link/main.py
buenosairesam c5546cf7fc 1.1 changes
2025-12-29 14:17:53 -03:00

106 lines
2.6 KiB
Python

"""
Link Room - Adapter layer between managed apps and soleprint.
Exposes standardized JSON endpoints for data navigation.
Framework-agnostic via pluggable adapters.
"""
import os
from typing import Optional
from fastapi import FastAPI, HTTPException
app = FastAPI(title="Link Room", version="0.1.0")
# Lazy-loaded adapter instance
_adapter = None
def get_adapter():
"""Get or create adapter instance."""
global _adapter
if _adapter is None:
adapter_type = os.getenv("ADAPTER_TYPE", "django")
# Database config from environment
db_config = {
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
"name": os.getenv("DB_NAME", "amarback"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", ""),
}
if adapter_type == "django":
from adapters.django import DjangoAdapter
_adapter = DjangoAdapter(db_config)
else:
raise ValueError(f"Unknown adapter type: {adapter_type}")
return _adapter
@app.get("/health")
def health():
"""Health check."""
adapter_type = os.getenv("ADAPTER_TYPE", "django")
# Test adapter connection
adapter_ok = False
try:
adapter = get_adapter()
adapter_ok = True
except Exception as e:
print(f"Adapter error: {e}")
return {
"status": "ok" if adapter_ok else "degraded",
"service": "link-room",
"adapter": adapter_type,
"adapter_loaded": adapter_ok,
}
@app.get("/api/queries")
def list_queries():
"""List available predefined queries."""
adapter = get_adapter()
return {
"queries": adapter.get_queries()
}
@app.get("/api/navigate")
def navigate(query: Optional[str] = None, entity: Optional[str] = None, id: Optional[int] = None):
"""
Navigate data graph.
Query mode: ?query=user_with_pets
Navigation mode: ?entity=User&id=123
Returns:
{
"nodes": [...],
"edges": [...],
"summary": {...}
}
"""
try:
adapter = get_adapter()
result = adapter.navigate(query=query, entity=entity, id=id)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
print(f"Navigate error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv("PORT", "8000")),
reload=True,
)