Files
soleprint/station/monitors/databrowse/main.py
2025-12-31 08:34:18 -03:00

407 lines
12 KiB
Python

"""
Databrowse Monitor - Test-oriented data navigation.
Navigate users, entities, and data to find the right test scenario.
Room-agnostic: configure via environment or cfg/<room>/databrowse/depot/
Run standalone:
python main.py
Or use uvicorn:
uvicorn main:app --port 12020 --reload
"""
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
app = FastAPI(title="Databrowse Monitor", version="0.1.0")
# Paths
BASE_DIR = Path(__file__).parent
DEPOT_DIR = Path(os.getenv("DATABROWSE_DEPOT", str(BASE_DIR / "depot")))
templates = Jinja2Templates(directory=str(BASE_DIR))
# =============================================================================
# ROOM CONFIG - Pluggable environment targeting
# =============================================================================
# Default room: local development database
# Override with env vars or future room selector UI
ROOM_CONFIG = {
"name": os.getenv("ROOM_NAME", "local"),
"db": {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
"name": os.getenv("DB_NAME", "database"),
"user": os.getenv("DB_USER", "user"),
"password": os.getenv("DB_PASSWORD", ""),
},
}
def get_db_url() -> str:
"""Build database URL from room config."""
db = ROOM_CONFIG["db"]
return f"postgresql://{db['user']}:{db['password']}@{db['host']}:{db['port']}/{db['name']}"
# =============================================================================
# DEPOT DATA - Load from JSON files
# =============================================================================
_schema: Optional[Dict[str, Any]] = None
_views: Optional[Dict[str, Any]] = None
_scenarios: Optional[Dict[str, Any]] = None
def load_depot_file(filename: str) -> Dict[str, Any]:
"""Load JSON file from depot directory."""
path = DEPOT_DIR / filename
if not path.exists():
return {}
with open(path) as f:
return json.load(f)
def get_schema() -> Dict[str, Any]:
"""Get data model schema."""
global _schema
if _schema is None:
_schema = load_depot_file("schema.json")
return _schema
def get_views() -> Dict[str, Any]:
"""Get view configurations."""
global _views
if _views is None:
_views = load_depot_file("views.json")
return _views
def get_scenarios() -> Dict[str, Any]:
"""Get test scenario definitions."""
global _scenarios
if _scenarios is None:
_scenarios = load_depot_file("scenarios.json")
return _scenarios
# =============================================================================
# DATABASE
# =============================================================================
_engine: Optional[Engine] = None
def get_engine() -> Optional[Engine]:
"""Get or create database engine (lazy singleton)."""
global _engine
if _engine is None:
try:
_engine = create_engine(get_db_url(), pool_pre_ping=True)
except Exception as e:
print(f"[databrowse] DB engine error: {e}")
return _engine
# =============================================================================
# SQL MODE - Direct database queries
# =============================================================================
def build_view_query(view_name: str) -> Optional[str]:
"""Build SQL query from view configuration.
This is the SQL mode - generates queries directly from view definitions.
"""
views = get_views()
view = None
for v in views.get("views", []):
if v.get("slug") == view_name or v.get("name") == view_name:
view = v
break
if not view:
return None
if view.get("mode") != "sql":
return None # Only SQL mode supported for now
schema = get_schema()
entity_def = schema.get("definitions", {}).get(view["entity"], {})
table = entity_def.get("table")
if not table:
return None
# Build field list
fields = view.get("fields", [])
field_list = []
for field_name in fields:
# Check if it's a regular field or computed field
if field_name in entity_def.get("properties", {}):
col = entity_def["properties"][field_name].get("column", field_name)
field_list.append(f"{table}.{col} as {field_name}")
elif field_name in entity_def.get("computed", {}):
computed_def = entity_def["computed"][field_name]
sql_expr = computed_def.get("sql", "NULL")
field_list.append(f"({sql_expr}) as {field_name}")
# Add computed group field if specified
group_by = view.get("group_by")
computed_group = view.get("computed_group", {}).get(group_by)
if computed_group:
group_sql = computed_group.get("sql")
field_list.append(f"({group_sql}) as {group_by}")
# Build query
query = f"SELECT {', '.join(field_list)} FROM {table}"
# Add filters
filters = view.get("filter", {})
where_clauses = []
for field, value in filters.items():
if isinstance(value, list):
# IN clause
values_str = ", ".join([f"'{v}'" for v in value])
where_clauses.append(f"{table}.{field} IN ({values_str})")
else:
where_clauses.append(f"{table}.{field} = '{value}'")
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
# Add ordering
order_by = view.get("order_by")
if order_by:
query += f" ORDER BY {order_by}"
return query
def execute_view_query(view_name: str) -> List[Dict[str, Any]]:
"""Execute view query and return results."""
engine = get_engine()
if not engine:
return []
query_sql = build_view_query(view_name)
if not query_sql:
return []
try:
with engine.connect() as conn:
result = conn.execute(text(query_sql))
rows = result.fetchall()
columns = result.keys()
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"[databrowse] Query error for view '{view_name}': {e}")
print(f"[databrowse] Query was: {query_sql}")
return []
def group_results_by(
results: List[Dict[str, Any]], group_field: str, labels: Dict[str, str] = None
) -> Dict[str, Any]:
"""Group results by a field value."""
grouped = {}
for row in results:
key = row.get(group_field, "unknown")
if key not in grouped:
grouped[key] = []
grouped[key].append(row)
# Add labels if provided
if labels:
return {
key: {"label": labels.get(key, key), "items": items}
for key, items in grouped.items()
}
return {key: {"label": key, "items": items} for key, items in grouped.items()}
# =============================================================================
# API MODE - Placeholder for backend integration
# =============================================================================
def fetch_via_api(view_name: str) -> List[Dict[str, Any]]:
"""Fetch data via backend API.
This is the API mode - will call backend endpoints.
Currently placeholder.
"""
# TODO: Implement API mode
# Will call endpoints like /api/v1/databrowse/{view_name}/
# with auth token from room config
return []
# =============================================================================
# ROUTES
# =============================================================================
@app.get("/health")
def health():
"""Health check."""
engine = get_engine()
db_ok = False
if engine:
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_ok = True
except:
pass
return {
"status": "ok" if db_ok else "degraded",
"service": "databrowse-monitor",
"room": ROOM_CONFIG["name"],
"database": "connected" if db_ok else "disconnected",
"depot": {
"path": str(DEPOT_DIR),
"schema": "loaded" if get_schema() else "missing",
"views": len(get_views().get("views", [])),
"scenarios": len(get_scenarios().get("scenarios", [])),
},
}
@app.get("/", response_class=HTMLResponse)
def index(request: Request):
"""Main data browser landing page."""
views = get_views().get("views", [])
scenarios = get_scenarios().get("scenarios", [])
return templates.TemplateResponse(
"index.html",
{
"request": request,
"views": views,
"scenarios": scenarios,
"room_name": ROOM_CONFIG["name"],
},
)
@app.get("/view/{view_slug}", response_class=HTMLResponse)
def view_data(request: Request, view_slug: str):
"""Display data for a specific view."""
views_data = get_views()
view_config = None
for v in views_data.get("views", []):
if v.get("slug") == view_slug:
view_config = v
break
if not view_config:
return HTMLResponse("View not found", status_code=404)
# Execute query based on mode
mode = view_config.get("mode", "sql")
if mode == "sql":
results = execute_view_query(view_slug)
elif mode == "api":
results = fetch_via_api(view_slug)
else:
results = []
# Group results if specified
group_by = view_config.get("group_by")
if group_by:
computed_group = view_config.get("computed_group", {}).get(group_by, {})
labels = computed_group.get("labels", {})
grouped_results = group_results_by(results, group_by, labels)
else:
grouped_results = {"all": {"label": "All Results", "items": results}}
return templates.TemplateResponse(
"view.html",
{
"request": request,
"view": view_config,
"results": results,
"grouped_results": grouped_results,
"total": len(results),
"room_name": ROOM_CONFIG["name"],
},
)
@app.get("/api/views")
def api_views():
"""List all available views."""
return get_views()
@app.get("/api/view/{view_slug}")
def api_view_data(view_slug: str):
"""Get data for a specific view as JSON."""
views_data = get_views()
view_config = None
for v in views_data.get("views", []):
if v.get("slug") == view_slug:
view_config = v
break
if not view_config:
return JSONResponse({"error": "View not found"}, status_code=404)
# Execute query
mode = view_config.get("mode", "sql")
if mode == "sql":
results = execute_view_query(view_slug)
elif mode == "api":
results = fetch_via_api(view_slug)
else:
results = []
return {
"view": view_config,
"results": results,
"total": len(results),
"room": ROOM_CONFIG["name"],
}
@app.get("/api/schema")
def api_schema():
"""Get data model schema."""
return get_schema()
@app.get("/api/scenarios")
def api_scenarios():
"""Get test scenario definitions."""
return get_scenarios()
# =============================================================================
# MAIN
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv("PORT", "12020")),
reload=True,
)