- Fix dashboard metrics alternating to 0 by merging partial batches in gateway before broadcasting to WebSocket clients. The aggregator sends metrics in batches of 20, causing partial updates that overwrote each other. Gateway now maintains machine_metrics_cache that accumulates metrics across batches. - Remove misleading gRPC calls counter from internals page (only incremented on health checks, not actual metric flow). Replace with cached_machines counter showing tracked machines. - Update internals.html stats panel to show Events, Broadcasts, Clients, and Machines instead of gRPC calls.
510 lines
16 KiB
Python
510 lines
16 KiB
Python
"""Gateway service - FastAPI with WebSocket for real-time dashboard."""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import grpc
|
|
from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect
|
|
from fastapi.requests import Request
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
# Add project root to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from services.aggregator.storage import TimescaleStorage
|
|
from shared import metrics_pb2, metrics_pb2_grpc
|
|
from shared.config import get_gateway_config
|
|
from shared.events import get_subscriber
|
|
from shared.logging import setup_logging
|
|
|
|
# Global state
|
|
config = get_gateway_config()
|
|
logger = setup_logging(
|
|
service_name=config.service_name,
|
|
log_level=config.log_level,
|
|
log_format=config.log_format,
|
|
)
|
|
|
|
|
|
# WebSocket connection manager
|
|
class ConnectionManager:
|
|
"""Manages WebSocket connections for real-time updates."""
|
|
|
|
def __init__(self):
|
|
self.active_connections: list[WebSocket] = []
|
|
|
|
async def connect(self, websocket: WebSocket) -> None:
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
logger.info("websocket_connected", total=len(self.active_connections))
|
|
|
|
def disconnect(self, websocket: WebSocket) -> None:
|
|
self.active_connections.remove(websocket)
|
|
logger.info("websocket_disconnected", total=len(self.active_connections))
|
|
|
|
async def broadcast(self, message: dict) -> None:
|
|
"""Broadcast message to all connected clients."""
|
|
if not self.active_connections:
|
|
return
|
|
|
|
data = json.dumps(message)
|
|
disconnected = []
|
|
|
|
for connection in self.active_connections:
|
|
try:
|
|
await connection.send_text(data)
|
|
except Exception:
|
|
disconnected.append(connection)
|
|
|
|
# Clean up disconnected
|
|
for conn in disconnected:
|
|
try:
|
|
self.active_connections.remove(conn)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
manager = ConnectionManager()
|
|
internals_manager = ConnectionManager() # Separate manager for internals page
|
|
timescale: TimescaleStorage | None = None
|
|
grpc_channel: grpc.aio.Channel | None = None
|
|
grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None
|
|
|
|
# Track recent events for internals view
|
|
recent_events: list[dict] = []
|
|
MAX_RECENT_EVENTS = 100
|
|
service_stats = {
|
|
"events_received": 0,
|
|
"websocket_broadcasts": 0,
|
|
"started_at": None,
|
|
}
|
|
|
|
# Cache of latest full metrics per machine (merges partial batches)
|
|
machine_metrics_cache: dict[str, dict] = {}
|
|
|
|
|
|
async def event_listener():
|
|
"""Background task that listens for metric events and broadcasts to WebSocket clients."""
|
|
logger.info("event_listener_starting")
|
|
|
|
async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber:
|
|
async for event in subscriber.consume():
|
|
try:
|
|
service_stats["events_received"] += 1
|
|
|
|
# Track for internals view
|
|
event_record = {
|
|
"id": event.event_id[:8],
|
|
"topic": event.topic,
|
|
"source": event.source,
|
|
"timestamp": event.timestamp.isoformat(),
|
|
"machine_id": event.payload.get("machine_id", ""),
|
|
"metrics_count": len(event.payload.get("metrics", {})),
|
|
}
|
|
recent_events.insert(0, event_record)
|
|
if len(recent_events) > MAX_RECENT_EVENTS:
|
|
recent_events.pop()
|
|
|
|
# Merge partial batch metrics into cache
|
|
machine_id = event.payload.get("machine_id", "")
|
|
incoming_metrics = event.payload.get("metrics", {})
|
|
|
|
if machine_id:
|
|
if machine_id not in machine_metrics_cache:
|
|
machine_metrics_cache[machine_id] = {}
|
|
# Merge new metrics into existing (accumulate across batches)
|
|
machine_metrics_cache[machine_id].update(incoming_metrics)
|
|
|
|
# Build complete payload with merged metrics
|
|
merged_payload = {
|
|
"machine_id": machine_id,
|
|
"hostname": event.payload.get("hostname", ""),
|
|
"timestamp_ms": event.payload.get("timestamp_ms", 0),
|
|
"metrics": machine_metrics_cache[machine_id],
|
|
}
|
|
else:
|
|
merged_payload = event.payload
|
|
|
|
# Broadcast merged data to dashboard
|
|
await manager.broadcast(
|
|
{
|
|
"type": "metrics",
|
|
"data": merged_payload,
|
|
"timestamp": event.timestamp.isoformat(),
|
|
}
|
|
)
|
|
service_stats["websocket_broadcasts"] += 1
|
|
|
|
# Broadcast to internals (show raw event, not merged)
|
|
await internals_manager.broadcast(
|
|
{
|
|
"type": "event",
|
|
"data": event_record,
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.warning("broadcast_error", error=str(e))
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan manager."""
|
|
global timescale, grpc_channel, grpc_stub
|
|
|
|
logger.info("gateway_starting", port=config.http_port)
|
|
|
|
# Connect to TimescaleDB for historical queries
|
|
timescale = TimescaleStorage(config.timescale_url)
|
|
try:
|
|
await timescale.connect()
|
|
except Exception as e:
|
|
logger.warning("timescale_connection_failed", error=str(e))
|
|
timescale = None
|
|
|
|
# Connect to aggregator via gRPC
|
|
grpc_channel = grpc.aio.insecure_channel(config.aggregator_url)
|
|
grpc_stub = metrics_pb2_grpc.MetricsServiceStub(grpc_channel)
|
|
|
|
# Start event listener in background
|
|
listener_task = asyncio.create_task(event_listener())
|
|
|
|
service_stats["started_at"] = datetime.utcnow().isoformat()
|
|
logger.info("gateway_started")
|
|
|
|
yield
|
|
|
|
# Cleanup
|
|
listener_task.cancel()
|
|
try:
|
|
await listener_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
if grpc_channel:
|
|
await grpc_channel.close()
|
|
|
|
if timescale:
|
|
await timescale.disconnect()
|
|
|
|
logger.info("gateway_stopped")
|
|
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI(
|
|
title="System Monitor Gateway",
|
|
description="Real-time system monitoring dashboard",
|
|
version="0.1.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# Mount static files
|
|
static_path = Path(__file__).parent.parent.parent / "web" / "static"
|
|
if static_path.exists():
|
|
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
|
|
|
|
# Templates
|
|
templates_path = Path(__file__).parent.parent.parent / "web" / "templates"
|
|
templates = (
|
|
Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Health endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
return {"status": "healthy", "service": "gateway"}
|
|
|
|
|
|
@app.get("/ready")
|
|
async def readiness_check():
|
|
"""Readiness check - verifies dependencies."""
|
|
checks = {"gateway": "ok"}
|
|
|
|
# Check gRPC connection
|
|
try:
|
|
if grpc_stub:
|
|
await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
|
|
checks["aggregator"] = "ok"
|
|
except Exception as e:
|
|
checks["aggregator"] = f"error: {str(e)}"
|
|
|
|
# Check TimescaleDB
|
|
if timescale and timescale._pool:
|
|
checks["timescaledb"] = "ok"
|
|
else:
|
|
checks["timescaledb"] = "not connected"
|
|
|
|
return {"status": "ready", "checks": checks}
|
|
|
|
|
|
# ============================================================================
|
|
# REST API endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@app.get("/api/machines")
|
|
async def get_machines():
|
|
"""Get current state of all machines."""
|
|
if not grpc_stub:
|
|
raise HTTPException(status_code=503, detail="Aggregator not connected")
|
|
|
|
try:
|
|
response = await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=5.0)
|
|
|
|
machines = []
|
|
for state in response.machines:
|
|
metrics = {}
|
|
for m in state.current_metrics:
|
|
metric_type = metrics_pb2.MetricType.Name(m.type)
|
|
metrics[metric_type] = m.value
|
|
|
|
machines.append(
|
|
{
|
|
"machine_id": state.machine_id,
|
|
"hostname": state.hostname,
|
|
"last_seen_ms": state.last_seen_ms,
|
|
"health": metrics_pb2.HealthStatus.Name(state.health),
|
|
"metrics": metrics,
|
|
}
|
|
)
|
|
|
|
return {"machines": machines}
|
|
|
|
except grpc.aio.AioRpcError as e:
|
|
raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}")
|
|
|
|
|
|
@app.get("/api/machines/{machine_id}")
|
|
async def get_machine(machine_id: str):
|
|
"""Get current state of a specific machine."""
|
|
if not grpc_stub:
|
|
raise HTTPException(status_code=503, detail="Aggregator not connected")
|
|
|
|
try:
|
|
response = await grpc_stub.GetCurrentState(
|
|
metrics_pb2.StateRequest(machine_id=machine_id),
|
|
timeout=5.0,
|
|
)
|
|
|
|
if not response.machine_id:
|
|
raise HTTPException(status_code=404, detail="Machine not found")
|
|
|
|
metrics = {}
|
|
for m in response.current_metrics:
|
|
metric_type = metrics_pb2.MetricType.Name(m.type)
|
|
metrics[metric_type] = m.value
|
|
|
|
return {
|
|
"machine_id": response.machine_id,
|
|
"hostname": response.hostname,
|
|
"last_seen_ms": response.last_seen_ms,
|
|
"health": metrics_pb2.HealthStatus.Name(response.health),
|
|
"metrics": metrics,
|
|
}
|
|
|
|
except grpc.aio.AioRpcError as e:
|
|
if e.code() == grpc.StatusCode.NOT_FOUND:
|
|
raise HTTPException(status_code=404, detail="Machine not found")
|
|
raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}")
|
|
|
|
|
|
@app.get("/api/metrics")
|
|
async def get_metrics(
|
|
machine_id: str | None = Query(None),
|
|
metric_type: str | None = Query(None),
|
|
minutes: int = Query(60, ge=1, le=1440),
|
|
limit: int = Query(1000, ge=1, le=10000),
|
|
):
|
|
"""Get historical metrics."""
|
|
if not timescale:
|
|
raise HTTPException(status_code=503, detail="TimescaleDB not connected")
|
|
|
|
end_time = datetime.utcnow()
|
|
start_time = end_time - timedelta(minutes=minutes)
|
|
|
|
try:
|
|
metrics = await timescale.get_metrics(
|
|
machine_id=machine_id,
|
|
metric_type=metric_type,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
limit=limit,
|
|
)
|
|
|
|
return {"metrics": metrics, "count": len(metrics)}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# WebSocket endpoint
|
|
# ============================================================================
|
|
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
"""WebSocket endpoint for real-time metric updates."""
|
|
await manager.connect(websocket)
|
|
|
|
try:
|
|
# Send initial state
|
|
if grpc_stub:
|
|
try:
|
|
response = await grpc_stub.GetAllStates(
|
|
metrics_pb2.Empty(), timeout=5.0
|
|
)
|
|
|
|
for state in response.machines:
|
|
metrics = {}
|
|
for m in state.current_metrics:
|
|
metric_type = metrics_pb2.MetricType.Name(m.type)
|
|
metrics[metric_type] = m.value
|
|
|
|
await websocket.send_json(
|
|
{
|
|
"type": "initial",
|
|
"data": {
|
|
"machine_id": state.machine_id,
|
|
"hostname": state.hostname,
|
|
"metrics": metrics,
|
|
},
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.warning("initial_state_error", error=str(e))
|
|
|
|
# Keep connection alive and handle incoming messages
|
|
while True:
|
|
try:
|
|
data = await websocket.receive_text()
|
|
# Handle ping/pong or commands from client
|
|
if data == "ping":
|
|
await websocket.send_text("pong")
|
|
except WebSocketDisconnect:
|
|
break
|
|
|
|
finally:
|
|
manager.disconnect(websocket)
|
|
|
|
|
|
# ============================================================================
|
|
# Internals / Debug endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@app.get("/api/internals")
|
|
async def get_internals():
|
|
"""Get internal service stats and recent events."""
|
|
# Get service health
|
|
services = {
|
|
"gateway": {"status": "healthy", "started_at": service_stats["started_at"]},
|
|
"aggregator": {"status": "unknown"},
|
|
"redis": {"status": "unknown"},
|
|
"timescaledb": {"status": "unknown"},
|
|
}
|
|
|
|
# Check aggregator
|
|
try:
|
|
if grpc_stub:
|
|
await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
|
|
services["aggregator"]["status"] = "healthy"
|
|
except Exception as e:
|
|
services["aggregator"]["status"] = f"error: {str(e)[:50]}"
|
|
|
|
# Check TimescaleDB
|
|
if timescale and timescale._pool:
|
|
services["timescaledb"]["status"] = "healthy"
|
|
else:
|
|
services["timescaledb"]["status"] = "not connected"
|
|
|
|
# Redis is healthy if we're receiving events
|
|
if service_stats["events_received"] > 0:
|
|
services["redis"]["status"] = "healthy"
|
|
|
|
return {
|
|
"stats": {
|
|
"events_received": service_stats["events_received"],
|
|
"websocket_broadcasts": service_stats["websocket_broadcasts"],
|
|
"dashboard_connections": len(manager.active_connections),
|
|
"internals_connections": len(internals_manager.active_connections),
|
|
"cached_machines": len(machine_metrics_cache),
|
|
},
|
|
"services": services,
|
|
"recent_events": recent_events[:20],
|
|
}
|
|
|
|
|
|
@app.websocket("/ws/internals")
|
|
async def internals_websocket(websocket: WebSocket):
|
|
"""WebSocket for real-time internals updates."""
|
|
await internals_manager.connect(websocket)
|
|
|
|
try:
|
|
# Send initial state
|
|
await websocket.send_json(
|
|
{
|
|
"type": "init",
|
|
"data": {
|
|
"stats": {
|
|
"events_received": service_stats["events_received"],
|
|
"websocket_broadcasts": service_stats["websocket_broadcasts"],
|
|
"cached_machines": len(machine_metrics_cache),
|
|
},
|
|
"recent_events": recent_events[:20],
|
|
},
|
|
}
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
data = await websocket.receive_text()
|
|
if data == "ping":
|
|
await websocket.send_text("pong")
|
|
except WebSocketDisconnect:
|
|
break
|
|
finally:
|
|
internals_manager.disconnect(websocket)
|
|
|
|
|
|
# ============================================================================
|
|
# Dashboard (HTML)
|
|
# ============================================================================
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
"""Serve the dashboard HTML."""
|
|
if templates:
|
|
return templates.TemplateResponse("dashboard.html", {"request": request})
|
|
return HTMLResponse(
|
|
"<h1>Dashboard template not found</h1><p><a href='/internals'>Internals</a> | <a href='/docs'>API Docs</a></p>"
|
|
)
|
|
|
|
|
|
@app.get("/internals", response_class=HTMLResponse)
|
|
async def internals_page(request: Request):
|
|
"""Serve the internals/debug HTML page."""
|
|
if templates:
|
|
return templates.TemplateResponse("internals.html", {"request": request})
|
|
return HTMLResponse("<h1>Internals template not found</h1>")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=config.http_port)
|