Files
sysmonstm/services/gateway/main.py
buenosairesam 00b1e663d9 Fix metrics flickering and improve internals page
- Fix dashboard metrics alternating to 0 by merging partial batches
  in gateway before broadcasting to WebSocket clients. The aggregator
  sends metrics in batches of 20, causing partial updates that
  overwrote each other. Gateway now maintains machine_metrics_cache
  that accumulates metrics across batches.

- Remove misleading gRPC calls counter from internals page (only
  incremented on health checks, not actual metric flow). Replace
  with cached_machines counter showing tracked machines.

- Update internals.html stats panel to show Events, Broadcasts,
  Clients, and Machines instead of gRPC calls.
2025-12-31 02:15:57 -03:00

510 lines
16 KiB
Python

"""Gateway service - FastAPI with WebSocket for real-time dashboard."""
import asyncio
import json
import sys
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import grpc
from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect
from fastapi.requests import Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from services.aggregator.storage import TimescaleStorage
from shared import metrics_pb2, metrics_pb2_grpc
from shared.config import get_gateway_config
from shared.events import get_subscriber
from shared.logging import setup_logging
# Global state
config = get_gateway_config()
logger = setup_logging(
service_name=config.service_name,
log_level=config.log_level,
log_format=config.log_format,
)
# WebSocket connection manager
class ConnectionManager:
"""Manages WebSocket connections for real-time updates."""
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self.active_connections.append(websocket)
logger.info("websocket_connected", total=len(self.active_connections))
def disconnect(self, websocket: WebSocket) -> None:
self.active_connections.remove(websocket)
logger.info("websocket_disconnected", total=len(self.active_connections))
async def broadcast(self, message: dict) -> None:
"""Broadcast message to all connected clients."""
if not self.active_connections:
return
data = json.dumps(message)
disconnected = []
for connection in self.active_connections:
try:
await connection.send_text(data)
except Exception:
disconnected.append(connection)
# Clean up disconnected
for conn in disconnected:
try:
self.active_connections.remove(conn)
except ValueError:
pass
manager = ConnectionManager()
internals_manager = ConnectionManager() # Separate manager for internals page
timescale: TimescaleStorage | None = None
grpc_channel: grpc.aio.Channel | None = None
grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None
# Track recent events for internals view
recent_events: list[dict] = []
MAX_RECENT_EVENTS = 100
service_stats = {
"events_received": 0,
"websocket_broadcasts": 0,
"started_at": None,
}
# Cache of latest full metrics per machine (merges partial batches)
machine_metrics_cache: dict[str, dict] = {}
async def event_listener():
"""Background task that listens for metric events and broadcasts to WebSocket clients."""
logger.info("event_listener_starting")
async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber:
async for event in subscriber.consume():
try:
service_stats["events_received"] += 1
# Track for internals view
event_record = {
"id": event.event_id[:8],
"topic": event.topic,
"source": event.source,
"timestamp": event.timestamp.isoformat(),
"machine_id": event.payload.get("machine_id", ""),
"metrics_count": len(event.payload.get("metrics", {})),
}
recent_events.insert(0, event_record)
if len(recent_events) > MAX_RECENT_EVENTS:
recent_events.pop()
# Merge partial batch metrics into cache
machine_id = event.payload.get("machine_id", "")
incoming_metrics = event.payload.get("metrics", {})
if machine_id:
if machine_id not in machine_metrics_cache:
machine_metrics_cache[machine_id] = {}
# Merge new metrics into existing (accumulate across batches)
machine_metrics_cache[machine_id].update(incoming_metrics)
# Build complete payload with merged metrics
merged_payload = {
"machine_id": machine_id,
"hostname": event.payload.get("hostname", ""),
"timestamp_ms": event.payload.get("timestamp_ms", 0),
"metrics": machine_metrics_cache[machine_id],
}
else:
merged_payload = event.payload
# Broadcast merged data to dashboard
await manager.broadcast(
{
"type": "metrics",
"data": merged_payload,
"timestamp": event.timestamp.isoformat(),
}
)
service_stats["websocket_broadcasts"] += 1
# Broadcast to internals (show raw event, not merged)
await internals_manager.broadcast(
{
"type": "event",
"data": event_record,
}
)
except Exception as e:
logger.warning("broadcast_error", error=str(e))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
global timescale, grpc_channel, grpc_stub
logger.info("gateway_starting", port=config.http_port)
# Connect to TimescaleDB for historical queries
timescale = TimescaleStorage(config.timescale_url)
try:
await timescale.connect()
except Exception as e:
logger.warning("timescale_connection_failed", error=str(e))
timescale = None
# Connect to aggregator via gRPC
grpc_channel = grpc.aio.insecure_channel(config.aggregator_url)
grpc_stub = metrics_pb2_grpc.MetricsServiceStub(grpc_channel)
# Start event listener in background
listener_task = asyncio.create_task(event_listener())
service_stats["started_at"] = datetime.utcnow().isoformat()
logger.info("gateway_started")
yield
# Cleanup
listener_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass
if grpc_channel:
await grpc_channel.close()
if timescale:
await timescale.disconnect()
logger.info("gateway_stopped")
# Create FastAPI app
app = FastAPI(
title="System Monitor Gateway",
description="Real-time system monitoring dashboard",
version="0.1.0",
lifespan=lifespan,
)
# Mount static files
static_path = Path(__file__).parent.parent.parent / "web" / "static"
if static_path.exists():
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
# Templates
templates_path = Path(__file__).parent.parent.parent / "web" / "templates"
templates = (
Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None
)
# ============================================================================
# Health endpoints
# ============================================================================
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "gateway"}
@app.get("/ready")
async def readiness_check():
"""Readiness check - verifies dependencies."""
checks = {"gateway": "ok"}
# Check gRPC connection
try:
if grpc_stub:
await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
checks["aggregator"] = "ok"
except Exception as e:
checks["aggregator"] = f"error: {str(e)}"
# Check TimescaleDB
if timescale and timescale._pool:
checks["timescaledb"] = "ok"
else:
checks["timescaledb"] = "not connected"
return {"status": "ready", "checks": checks}
# ============================================================================
# REST API endpoints
# ============================================================================
@app.get("/api/machines")
async def get_machines():
"""Get current state of all machines."""
if not grpc_stub:
raise HTTPException(status_code=503, detail="Aggregator not connected")
try:
response = await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=5.0)
machines = []
for state in response.machines:
metrics = {}
for m in state.current_metrics:
metric_type = metrics_pb2.MetricType.Name(m.type)
metrics[metric_type] = m.value
machines.append(
{
"machine_id": state.machine_id,
"hostname": state.hostname,
"last_seen_ms": state.last_seen_ms,
"health": metrics_pb2.HealthStatus.Name(state.health),
"metrics": metrics,
}
)
return {"machines": machines}
except grpc.aio.AioRpcError as e:
raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}")
@app.get("/api/machines/{machine_id}")
async def get_machine(machine_id: str):
"""Get current state of a specific machine."""
if not grpc_stub:
raise HTTPException(status_code=503, detail="Aggregator not connected")
try:
response = await grpc_stub.GetCurrentState(
metrics_pb2.StateRequest(machine_id=machine_id),
timeout=5.0,
)
if not response.machine_id:
raise HTTPException(status_code=404, detail="Machine not found")
metrics = {}
for m in response.current_metrics:
metric_type = metrics_pb2.MetricType.Name(m.type)
metrics[metric_type] = m.value
return {
"machine_id": response.machine_id,
"hostname": response.hostname,
"last_seen_ms": response.last_seen_ms,
"health": metrics_pb2.HealthStatus.Name(response.health),
"metrics": metrics,
}
except grpc.aio.AioRpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
raise HTTPException(status_code=404, detail="Machine not found")
raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}")
@app.get("/api/metrics")
async def get_metrics(
machine_id: str | None = Query(None),
metric_type: str | None = Query(None),
minutes: int = Query(60, ge=1, le=1440),
limit: int = Query(1000, ge=1, le=10000),
):
"""Get historical metrics."""
if not timescale:
raise HTTPException(status_code=503, detail="TimescaleDB not connected")
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=minutes)
try:
metrics = await timescale.get_metrics(
machine_id=machine_id,
metric_type=metric_type,
start_time=start_time,
end_time=end_time,
limit=limit,
)
return {"metrics": metrics, "count": len(metrics)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# WebSocket endpoint
# ============================================================================
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time metric updates."""
await manager.connect(websocket)
try:
# Send initial state
if grpc_stub:
try:
response = await grpc_stub.GetAllStates(
metrics_pb2.Empty(), timeout=5.0
)
for state in response.machines:
metrics = {}
for m in state.current_metrics:
metric_type = metrics_pb2.MetricType.Name(m.type)
metrics[metric_type] = m.value
await websocket.send_json(
{
"type": "initial",
"data": {
"machine_id": state.machine_id,
"hostname": state.hostname,
"metrics": metrics,
},
}
)
except Exception as e:
logger.warning("initial_state_error", error=str(e))
# Keep connection alive and handle incoming messages
while True:
try:
data = await websocket.receive_text()
# Handle ping/pong or commands from client
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
finally:
manager.disconnect(websocket)
# ============================================================================
# Internals / Debug endpoints
# ============================================================================
@app.get("/api/internals")
async def get_internals():
"""Get internal service stats and recent events."""
# Get service health
services = {
"gateway": {"status": "healthy", "started_at": service_stats["started_at"]},
"aggregator": {"status": "unknown"},
"redis": {"status": "unknown"},
"timescaledb": {"status": "unknown"},
}
# Check aggregator
try:
if grpc_stub:
await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
services["aggregator"]["status"] = "healthy"
except Exception as e:
services["aggregator"]["status"] = f"error: {str(e)[:50]}"
# Check TimescaleDB
if timescale and timescale._pool:
services["timescaledb"]["status"] = "healthy"
else:
services["timescaledb"]["status"] = "not connected"
# Redis is healthy if we're receiving events
if service_stats["events_received"] > 0:
services["redis"]["status"] = "healthy"
return {
"stats": {
"events_received": service_stats["events_received"],
"websocket_broadcasts": service_stats["websocket_broadcasts"],
"dashboard_connections": len(manager.active_connections),
"internals_connections": len(internals_manager.active_connections),
"cached_machines": len(machine_metrics_cache),
},
"services": services,
"recent_events": recent_events[:20],
}
@app.websocket("/ws/internals")
async def internals_websocket(websocket: WebSocket):
"""WebSocket for real-time internals updates."""
await internals_manager.connect(websocket)
try:
# Send initial state
await websocket.send_json(
{
"type": "init",
"data": {
"stats": {
"events_received": service_stats["events_received"],
"websocket_broadcasts": service_stats["websocket_broadcasts"],
"cached_machines": len(machine_metrics_cache),
},
"recent_events": recent_events[:20],
},
}
)
while True:
try:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
finally:
internals_manager.disconnect(websocket)
# ============================================================================
# Dashboard (HTML)
# ============================================================================
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""Serve the dashboard HTML."""
if templates:
return templates.TemplateResponse("dashboard.html", {"request": request})
return HTMLResponse(
"<h1>Dashboard template not found</h1><p><a href='/internals'>Internals</a> | <a href='/docs'>API Docs</a></p>"
)
@app.get("/internals", response_class=HTMLResponse)
async def internals_page(request: Request):
"""Serve the internals/debug HTML page."""
if templates:
return templates.TemplateResponse("internals.html", {"request": request})
return HTMLResponse("<h1>Internals template not found</h1>")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=config.http_port)