407 lines
12 KiB
Python
407 lines
12 KiB
Python
"""
|
|
Databrowse Monitor - Test-oriented data navigation.
|
|
|
|
Navigate users, entities, and data to find the right test scenario.
|
|
Room-agnostic: configure via environment or cfg/<room>/databrowse/depot/
|
|
|
|
Run standalone:
|
|
python main.py
|
|
|
|
Or use uvicorn:
|
|
uvicorn main:app --port 12020 --reload
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse, JSONResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.engine import Engine
|
|
|
|
app = FastAPI(title="Databrowse Monitor", version="0.1.0")
|
|
|
|
# Paths
|
|
BASE_DIR = Path(__file__).parent
|
|
DEPOT_DIR = Path(os.getenv("DATABROWSE_DEPOT", str(BASE_DIR / "depot")))
|
|
templates = Jinja2Templates(directory=str(BASE_DIR))
|
|
|
|
# =============================================================================
|
|
# ROOM CONFIG - Pluggable environment targeting
|
|
# =============================================================================
|
|
# Default room: local development database
|
|
# Override with env vars or future room selector UI
|
|
|
|
ROOM_CONFIG = {
|
|
"name": os.getenv("ROOM_NAME", "local"),
|
|
"db": {
|
|
"host": os.getenv("DB_HOST", "localhost"),
|
|
"port": os.getenv("DB_PORT", "5432"),
|
|
"name": os.getenv("DB_NAME", "database"),
|
|
"user": os.getenv("DB_USER", "user"),
|
|
"password": os.getenv("DB_PASSWORD", ""),
|
|
},
|
|
}
|
|
|
|
|
|
def get_db_url() -> str:
|
|
"""Build database URL from room config."""
|
|
db = ROOM_CONFIG["db"]
|
|
return f"postgresql://{db['user']}:{db['password']}@{db['host']}:{db['port']}/{db['name']}"
|
|
|
|
|
|
# =============================================================================
|
|
# DEPOT DATA - Load from JSON files
|
|
# =============================================================================
|
|
|
|
_schema: Optional[Dict[str, Any]] = None
|
|
_views: Optional[Dict[str, Any]] = None
|
|
_scenarios: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
def load_depot_file(filename: str) -> Dict[str, Any]:
|
|
"""Load JSON file from depot directory."""
|
|
path = DEPOT_DIR / filename
|
|
if not path.exists():
|
|
return {}
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def get_schema() -> Dict[str, Any]:
|
|
"""Get data model schema."""
|
|
global _schema
|
|
if _schema is None:
|
|
_schema = load_depot_file("schema.json")
|
|
return _schema
|
|
|
|
|
|
def get_views() -> Dict[str, Any]:
|
|
"""Get view configurations."""
|
|
global _views
|
|
if _views is None:
|
|
_views = load_depot_file("views.json")
|
|
return _views
|
|
|
|
|
|
def get_scenarios() -> Dict[str, Any]:
|
|
"""Get test scenario definitions."""
|
|
global _scenarios
|
|
if _scenarios is None:
|
|
_scenarios = load_depot_file("scenarios.json")
|
|
return _scenarios
|
|
|
|
|
|
# =============================================================================
|
|
# DATABASE
|
|
# =============================================================================
|
|
|
|
_engine: Optional[Engine] = None
|
|
|
|
|
|
def get_engine() -> Optional[Engine]:
|
|
"""Get or create database engine (lazy singleton)."""
|
|
global _engine
|
|
if _engine is None:
|
|
try:
|
|
_engine = create_engine(get_db_url(), pool_pre_ping=True)
|
|
except Exception as e:
|
|
print(f"[databrowse] DB engine error: {e}")
|
|
return _engine
|
|
|
|
|
|
# =============================================================================
|
|
# SQL MODE - Direct database queries
|
|
# =============================================================================
|
|
|
|
|
|
def build_view_query(view_name: str) -> Optional[str]:
|
|
"""Build SQL query from view configuration.
|
|
|
|
This is the SQL mode - generates queries directly from view definitions.
|
|
"""
|
|
views = get_views()
|
|
view = None
|
|
for v in views.get("views", []):
|
|
if v.get("slug") == view_name or v.get("name") == view_name:
|
|
view = v
|
|
break
|
|
|
|
if not view:
|
|
return None
|
|
|
|
if view.get("mode") != "sql":
|
|
return None # Only SQL mode supported for now
|
|
|
|
schema = get_schema()
|
|
entity_def = schema.get("definitions", {}).get(view["entity"], {})
|
|
table = entity_def.get("table")
|
|
|
|
if not table:
|
|
return None
|
|
|
|
# Build field list
|
|
fields = view.get("fields", [])
|
|
field_list = []
|
|
|
|
for field_name in fields:
|
|
# Check if it's a regular field or computed field
|
|
if field_name in entity_def.get("properties", {}):
|
|
col = entity_def["properties"][field_name].get("column", field_name)
|
|
field_list.append(f"{table}.{col} as {field_name}")
|
|
elif field_name in entity_def.get("computed", {}):
|
|
computed_def = entity_def["computed"][field_name]
|
|
sql_expr = computed_def.get("sql", "NULL")
|
|
field_list.append(f"({sql_expr}) as {field_name}")
|
|
|
|
# Add computed group field if specified
|
|
group_by = view.get("group_by")
|
|
computed_group = view.get("computed_group", {}).get(group_by)
|
|
if computed_group:
|
|
group_sql = computed_group.get("sql")
|
|
field_list.append(f"({group_sql}) as {group_by}")
|
|
|
|
# Build query
|
|
query = f"SELECT {', '.join(field_list)} FROM {table}"
|
|
|
|
# Add filters
|
|
filters = view.get("filter", {})
|
|
where_clauses = []
|
|
for field, value in filters.items():
|
|
if isinstance(value, list):
|
|
# IN clause
|
|
values_str = ", ".join([f"'{v}'" for v in value])
|
|
where_clauses.append(f"{table}.{field} IN ({values_str})")
|
|
else:
|
|
where_clauses.append(f"{table}.{field} = '{value}'")
|
|
|
|
if where_clauses:
|
|
query += " WHERE " + " AND ".join(where_clauses)
|
|
|
|
# Add ordering
|
|
order_by = view.get("order_by")
|
|
if order_by:
|
|
query += f" ORDER BY {order_by}"
|
|
|
|
return query
|
|
|
|
|
|
def execute_view_query(view_name: str) -> List[Dict[str, Any]]:
|
|
"""Execute view query and return results."""
|
|
engine = get_engine()
|
|
if not engine:
|
|
return []
|
|
|
|
query_sql = build_view_query(view_name)
|
|
if not query_sql:
|
|
return []
|
|
|
|
try:
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text(query_sql))
|
|
rows = result.fetchall()
|
|
columns = result.keys()
|
|
return [dict(zip(columns, row)) for row in rows]
|
|
except Exception as e:
|
|
print(f"[databrowse] Query error for view '{view_name}': {e}")
|
|
print(f"[databrowse] Query was: {query_sql}")
|
|
return []
|
|
|
|
|
|
def group_results_by(
|
|
results: List[Dict[str, Any]], group_field: str, labels: Dict[str, str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Group results by a field value."""
|
|
grouped = {}
|
|
for row in results:
|
|
key = row.get(group_field, "unknown")
|
|
if key not in grouped:
|
|
grouped[key] = []
|
|
grouped[key].append(row)
|
|
|
|
# Add labels if provided
|
|
if labels:
|
|
return {
|
|
key: {"label": labels.get(key, key), "items": items}
|
|
for key, items in grouped.items()
|
|
}
|
|
return {key: {"label": key, "items": items} for key, items in grouped.items()}
|
|
|
|
|
|
# =============================================================================
|
|
# API MODE - Placeholder for backend integration
|
|
# =============================================================================
|
|
|
|
|
|
def fetch_via_api(view_name: str) -> List[Dict[str, Any]]:
|
|
"""Fetch data via backend API.
|
|
|
|
This is the API mode - will call backend endpoints.
|
|
Currently placeholder.
|
|
"""
|
|
# TODO: Implement API mode
|
|
# Will call endpoints like /api/v1/databrowse/{view_name}/
|
|
# with auth token from room config
|
|
return []
|
|
|
|
|
|
# =============================================================================
|
|
# ROUTES
|
|
# =============================================================================
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
"""Health check."""
|
|
engine = get_engine()
|
|
db_ok = False
|
|
if engine:
|
|
try:
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1"))
|
|
db_ok = True
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"status": "ok" if db_ok else "degraded",
|
|
"service": "databrowse-monitor",
|
|
"room": ROOM_CONFIG["name"],
|
|
"database": "connected" if db_ok else "disconnected",
|
|
"depot": {
|
|
"path": str(DEPOT_DIR),
|
|
"schema": "loaded" if get_schema() else "missing",
|
|
"views": len(get_views().get("views", [])),
|
|
"scenarios": len(get_scenarios().get("scenarios", [])),
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index(request: Request):
|
|
"""Main data browser landing page."""
|
|
views = get_views().get("views", [])
|
|
scenarios = get_scenarios().get("scenarios", [])
|
|
|
|
return templates.TemplateResponse(
|
|
"index.html",
|
|
{
|
|
"request": request,
|
|
"views": views,
|
|
"scenarios": scenarios,
|
|
"room_name": ROOM_CONFIG["name"],
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/view/{view_slug}", response_class=HTMLResponse)
|
|
def view_data(request: Request, view_slug: str):
|
|
"""Display data for a specific view."""
|
|
views_data = get_views()
|
|
view_config = None
|
|
|
|
for v in views_data.get("views", []):
|
|
if v.get("slug") == view_slug:
|
|
view_config = v
|
|
break
|
|
|
|
if not view_config:
|
|
return HTMLResponse("View not found", status_code=404)
|
|
|
|
# Execute query based on mode
|
|
mode = view_config.get("mode", "sql")
|
|
if mode == "sql":
|
|
results = execute_view_query(view_slug)
|
|
elif mode == "api":
|
|
results = fetch_via_api(view_slug)
|
|
else:
|
|
results = []
|
|
|
|
# Group results if specified
|
|
group_by = view_config.get("group_by")
|
|
if group_by:
|
|
computed_group = view_config.get("computed_group", {}).get(group_by, {})
|
|
labels = computed_group.get("labels", {})
|
|
grouped_results = group_results_by(results, group_by, labels)
|
|
else:
|
|
grouped_results = {"all": {"label": "All Results", "items": results}}
|
|
|
|
return templates.TemplateResponse(
|
|
"view.html",
|
|
{
|
|
"request": request,
|
|
"view": view_config,
|
|
"results": results,
|
|
"grouped_results": grouped_results,
|
|
"total": len(results),
|
|
"room_name": ROOM_CONFIG["name"],
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/api/views")
|
|
def api_views():
|
|
"""List all available views."""
|
|
return get_views()
|
|
|
|
|
|
@app.get("/api/view/{view_slug}")
|
|
def api_view_data(view_slug: str):
|
|
"""Get data for a specific view as JSON."""
|
|
views_data = get_views()
|
|
view_config = None
|
|
|
|
for v in views_data.get("views", []):
|
|
if v.get("slug") == view_slug:
|
|
view_config = v
|
|
break
|
|
|
|
if not view_config:
|
|
return JSONResponse({"error": "View not found"}, status_code=404)
|
|
|
|
# Execute query
|
|
mode = view_config.get("mode", "sql")
|
|
if mode == "sql":
|
|
results = execute_view_query(view_slug)
|
|
elif mode == "api":
|
|
results = fetch_via_api(view_slug)
|
|
else:
|
|
results = []
|
|
|
|
return {
|
|
"view": view_config,
|
|
"results": results,
|
|
"total": len(results),
|
|
"room": ROOM_CONFIG["name"],
|
|
}
|
|
|
|
|
|
@app.get("/api/schema")
|
|
def api_schema():
|
|
"""Get data model schema."""
|
|
return get_schema()
|
|
|
|
|
|
@app.get("/api/scenarios")
|
|
def api_scenarios():
|
|
"""Get test scenario definitions."""
|
|
return get_scenarios()
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=int(os.getenv("PORT", "12020")),
|
|
reload=True,
|
|
)
|