diff --git a/services/aggregator/storage.py b/services/aggregator/storage.py index 1ea44a8..f6b0685 100644 --- a/services/aggregator/storage.py +++ b/services/aggregator/storage.py @@ -38,10 +38,21 @@ class RedisStorage: metrics: dict[str, float], timestamp_ms: int, ) -> None: - """Update the current state for a machine.""" + """Update the current state for a machine (merges metrics, doesn't replace).""" if not self._client: raise RuntimeError("Not connected to Redis") + key = f"machine:{machine_id}" + + # Get existing state to merge metrics + existing_data = await self._client.hget(key, "state") + if existing_data: + existing_state = json.loads(existing_data) + existing_metrics = existing_state.get("metrics", {}) + # Merge new metrics into existing (new values override old) + existing_metrics.update(metrics) + metrics = existing_metrics + state = { "machine_id": machine_id, "hostname": hostname, @@ -51,7 +62,6 @@ class RedisStorage: } # Store as hash for efficient partial reads - key = f"machine:{machine_id}" await self._client.hset( key, mapping={ diff --git a/services/gateway/main.py b/services/gateway/main.py index 0f66c57..dbcf2dc 100644 --- a/services/gateway/main.py +++ b/services/gateway/main.py @@ -72,25 +72,84 @@ class ConnectionManager: manager = ConnectionManager() +internals_manager = ConnectionManager() # Separate manager for internals page timescale: TimescaleStorage | None = None grpc_channel: grpc.aio.Channel | None = None grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None +# Track recent events for internals view +recent_events: list[dict] = [] +MAX_RECENT_EVENTS = 100 +service_stats = { + "events_received": 0, + "websocket_broadcasts": 0, + "started_at": None, +} + +# Cache of latest full metrics per machine (merges partial batches) +machine_metrics_cache: dict[str, dict] = {} + async def event_listener(): """Background task that listens for metric events and broadcasts to WebSocket clients.""" logger.info("event_listener_starting") - async with get_subscriber(topics=["metrics.raw"]) as subscriber: + async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber: async for event in subscriber.consume(): try: + service_stats["events_received"] += 1 + + # Track for internals view + event_record = { + "id": event.event_id[:8], + "topic": event.topic, + "source": event.source, + "timestamp": event.timestamp.isoformat(), + "machine_id": event.payload.get("machine_id", ""), + "metrics_count": len(event.payload.get("metrics", {})), + } + recent_events.insert(0, event_record) + if len(recent_events) > MAX_RECENT_EVENTS: + recent_events.pop() + + # Merge partial batch metrics into cache + machine_id = event.payload.get("machine_id", "") + incoming_metrics = event.payload.get("metrics", {}) + + if machine_id: + if machine_id not in machine_metrics_cache: + machine_metrics_cache[machine_id] = {} + # Merge new metrics into existing (accumulate across batches) + machine_metrics_cache[machine_id].update(incoming_metrics) + + # Build complete payload with merged metrics + merged_payload = { + "machine_id": machine_id, + "hostname": event.payload.get("hostname", ""), + "timestamp_ms": event.payload.get("timestamp_ms", 0), + "metrics": machine_metrics_cache[machine_id], + } + else: + merged_payload = event.payload + + # Broadcast merged data to dashboard await manager.broadcast( { "type": "metrics", - "data": event.payload, + "data": merged_payload, "timestamp": event.timestamp.isoformat(), } ) + service_stats["websocket_broadcasts"] += 1 + + # Broadcast to internals (show raw event, not merged) + await internals_manager.broadcast( + { + "type": "event", + "data": event_record, + } + ) + except Exception as e: logger.warning("broadcast_error", error=str(e)) @@ -117,6 +176,7 @@ async def lifespan(app: FastAPI): # Start event listener in background listener_task = asyncio.create_task(event_listener()) + service_stats["started_at"] = datetime.utcnow().isoformat() logger.info("gateway_started") yield @@ -341,6 +401,85 @@ async def websocket_endpoint(websocket: WebSocket): manager.disconnect(websocket) +# ============================================================================ +# Internals / Debug endpoints +# ============================================================================ + + +@app.get("/api/internals") +async def get_internals(): + """Get internal service stats and recent events.""" + # Get service health + services = { + "gateway": {"status": "healthy", "started_at": service_stats["started_at"]}, + "aggregator": {"status": "unknown"}, + "redis": {"status": "unknown"}, + "timescaledb": {"status": "unknown"}, + } + + # Check aggregator + try: + if grpc_stub: + await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0) + services["aggregator"]["status"] = "healthy" + except Exception as e: + services["aggregator"]["status"] = f"error: {str(e)[:50]}" + + # Check TimescaleDB + if timescale and timescale._pool: + services["timescaledb"]["status"] = "healthy" + else: + services["timescaledb"]["status"] = "not connected" + + # Redis is healthy if we're receiving events + if service_stats["events_received"] > 0: + services["redis"]["status"] = "healthy" + + return { + "stats": { + "events_received": service_stats["events_received"], + "websocket_broadcasts": service_stats["websocket_broadcasts"], + "dashboard_connections": len(manager.active_connections), + "internals_connections": len(internals_manager.active_connections), + "cached_machines": len(machine_metrics_cache), + }, + "services": services, + "recent_events": recent_events[:20], + } + + +@app.websocket("/ws/internals") +async def internals_websocket(websocket: WebSocket): + """WebSocket for real-time internals updates.""" + await internals_manager.connect(websocket) + + try: + # Send initial state + await websocket.send_json( + { + "type": "init", + "data": { + "stats": { + "events_received": service_stats["events_received"], + "websocket_broadcasts": service_stats["websocket_broadcasts"], + "cached_machines": len(machine_metrics_cache), + }, + "recent_events": recent_events[:20], + }, + } + ) + + while True: + try: + data = await websocket.receive_text() + if data == "ping": + await websocket.send_text("pong") + except WebSocketDisconnect: + break + finally: + internals_manager.disconnect(websocket) + + # ============================================================================ # Dashboard (HTML) # ============================================================================ @@ -351,40 +490,17 @@ async def dashboard(request: Request): """Serve the dashboard HTML.""" if templates: return templates.TemplateResponse("dashboard.html", {"request": request}) + return HTMLResponse( + "

Dashboard template not found

Internals | API Docs

" + ) - # Fallback if templates not found - return HTMLResponse(""" - - - - System Monitor - - - -

System Monitor

-

Dashboard template not found. API endpoints:

- -

Live Metrics

-
Connecting...
- - - - """) + +@app.get("/internals", response_class=HTMLResponse) +async def internals_page(request: Request): + """Serve the internals/debug HTML page.""" + if templates: + return templates.TemplateResponse("internals.html", {"request": request}) + return HTMLResponse("

Internals template not found

") if __name__ == "__main__": diff --git a/web/templates/internals.html b/web/templates/internals.html new file mode 100644 index 0000000..1d176d7 --- /dev/null +++ b/web/templates/internals.html @@ -0,0 +1,414 @@ + + + + + + System Monitor - Internals + + + +
+

>_ System Monitor Internals

+
+
+ + Connecting... +
+ +
+
+ +
+
+ + +
+
+ Event Stream + +
+
+
+ Waiting for events... +
+
+
+
+
+ + + +