Fix metrics flickering and improve internals page

- Fix dashboard metrics alternating to 0 by merging partial batches
  in gateway before broadcasting to WebSocket clients. The aggregator
  sends metrics in batches of 20, causing partial updates that
  overwrote each other. Gateway now maintains machine_metrics_cache
  that accumulates metrics across batches.

- Remove misleading gRPC calls counter from internals page (only
  incremented on health checks, not actual metric flow). Replace
  with cached_machines counter showing tracked machines.

- Update internals.html stats panel to show Events, Broadcasts,
  Clients, and Machines instead of gRPC calls.
This commit is contained in:
buenosairesam
2025-12-31 02:15:57 -03:00
parent ee9cbf73ec
commit 00b1e663d9
3 changed files with 577 additions and 37 deletions

View File

@@ -72,25 +72,84 @@ class ConnectionManager:
manager = ConnectionManager()
internals_manager = ConnectionManager() # Separate manager for internals page
timescale: TimescaleStorage | None = None
grpc_channel: grpc.aio.Channel | None = None
grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None
# Track recent events for internals view
recent_events: list[dict] = []
MAX_RECENT_EVENTS = 100
service_stats = {
"events_received": 0,
"websocket_broadcasts": 0,
"started_at": None,
}
# Cache of latest full metrics per machine (merges partial batches)
machine_metrics_cache: dict[str, dict] = {}
async def event_listener():
"""Background task that listens for metric events and broadcasts to WebSocket clients."""
logger.info("event_listener_starting")
async with get_subscriber(topics=["metrics.raw"]) as subscriber:
async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber:
async for event in subscriber.consume():
try:
service_stats["events_received"] += 1
# Track for internals view
event_record = {
"id": event.event_id[:8],
"topic": event.topic,
"source": event.source,
"timestamp": event.timestamp.isoformat(),
"machine_id": event.payload.get("machine_id", ""),
"metrics_count": len(event.payload.get("metrics", {})),
}
recent_events.insert(0, event_record)
if len(recent_events) > MAX_RECENT_EVENTS:
recent_events.pop()
# Merge partial batch metrics into cache
machine_id = event.payload.get("machine_id", "")
incoming_metrics = event.payload.get("metrics", {})
if machine_id:
if machine_id not in machine_metrics_cache:
machine_metrics_cache[machine_id] = {}
# Merge new metrics into existing (accumulate across batches)
machine_metrics_cache[machine_id].update(incoming_metrics)
# Build complete payload with merged metrics
merged_payload = {
"machine_id": machine_id,
"hostname": event.payload.get("hostname", ""),
"timestamp_ms": event.payload.get("timestamp_ms", 0),
"metrics": machine_metrics_cache[machine_id],
}
else:
merged_payload = event.payload
# Broadcast merged data to dashboard
await manager.broadcast(
{
"type": "metrics",
"data": event.payload,
"data": merged_payload,
"timestamp": event.timestamp.isoformat(),
}
)
service_stats["websocket_broadcasts"] += 1
# Broadcast to internals (show raw event, not merged)
await internals_manager.broadcast(
{
"type": "event",
"data": event_record,
}
)
except Exception as e:
logger.warning("broadcast_error", error=str(e))
@@ -117,6 +176,7 @@ async def lifespan(app: FastAPI):
# Start event listener in background
listener_task = asyncio.create_task(event_listener())
service_stats["started_at"] = datetime.utcnow().isoformat()
logger.info("gateway_started")
yield
@@ -341,6 +401,85 @@ async def websocket_endpoint(websocket: WebSocket):
manager.disconnect(websocket)
# ============================================================================
# Internals / Debug endpoints
# ============================================================================
@app.get("/api/internals")
async def get_internals():
"""Get internal service stats and recent events."""
# Get service health
services = {
"gateway": {"status": "healthy", "started_at": service_stats["started_at"]},
"aggregator": {"status": "unknown"},
"redis": {"status": "unknown"},
"timescaledb": {"status": "unknown"},
}
# Check aggregator
try:
if grpc_stub:
await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
services["aggregator"]["status"] = "healthy"
except Exception as e:
services["aggregator"]["status"] = f"error: {str(e)[:50]}"
# Check TimescaleDB
if timescale and timescale._pool:
services["timescaledb"]["status"] = "healthy"
else:
services["timescaledb"]["status"] = "not connected"
# Redis is healthy if we're receiving events
if service_stats["events_received"] > 0:
services["redis"]["status"] = "healthy"
return {
"stats": {
"events_received": service_stats["events_received"],
"websocket_broadcasts": service_stats["websocket_broadcasts"],
"dashboard_connections": len(manager.active_connections),
"internals_connections": len(internals_manager.active_connections),
"cached_machines": len(machine_metrics_cache),
},
"services": services,
"recent_events": recent_events[:20],
}
@app.websocket("/ws/internals")
async def internals_websocket(websocket: WebSocket):
"""WebSocket for real-time internals updates."""
await internals_manager.connect(websocket)
try:
# Send initial state
await websocket.send_json(
{
"type": "init",
"data": {
"stats": {
"events_received": service_stats["events_received"],
"websocket_broadcasts": service_stats["websocket_broadcasts"],
"cached_machines": len(machine_metrics_cache),
},
"recent_events": recent_events[:20],
},
}
)
while True:
try:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
break
finally:
internals_manager.disconnect(websocket)
# ============================================================================
# Dashboard (HTML)
# ============================================================================
@@ -351,40 +490,17 @@ async def dashboard(request: Request):
"""Serve the dashboard HTML."""
if templates:
return templates.TemplateResponse("dashboard.html", {"request": request})
return HTMLResponse(
"<h1>Dashboard template not found</h1><p><a href='/internals'>Internals</a> | <a href='/docs'>API Docs</a></p>"
)
# Fallback if templates not found
return HTMLResponse("""
<!DOCTYPE html>
<html>
<head>
<title>System Monitor</title>
<style>
body { font-family: system-ui; background: #1a1a2e; color: #eee; padding: 2rem; }
h1 { color: #e94560; }
pre { background: #16213e; padding: 1rem; border-radius: 8px; overflow: auto; }
</style>
</head>
<body>
<h1>System Monitor</h1>
<p>Dashboard template not found. API endpoints:</p>
<ul>
<li><a href="/api/machines">/api/machines</a> - Current state of all machines</li>
<li><a href="/api/metrics">/api/metrics</a> - Historical metrics</li>
<li><a href="/docs">/docs</a> - API documentation</li>
</ul>
<h2>Live Metrics</h2>
<pre id="output">Connecting...</pre>
<script>
const ws = new WebSocket(`ws://${location.host}/ws`);
const output = document.getElementById('output');
ws.onmessage = (e) => {
output.textContent = JSON.stringify(JSON.parse(e.data), null, 2);
};
ws.onclose = () => { output.textContent = 'Disconnected'; };
</script>
</body>
</html>
""")
@app.get("/internals", response_class=HTMLResponse)
async def internals_page(request: Request):
"""Serve the internals/debug HTML page."""
if templates:
return templates.TemplateResponse("internals.html", {"request": request})
return HTMLResponse("<h1>Internals template not found</h1>")
if __name__ == "__main__":