"""Gateway service - FastAPI with WebSocket for real-time dashboard.""" import asyncio import json import sys from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path from typing import Any import grpc from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.requests import Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates # Add project root to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from services.aggregator.storage import TimescaleStorage from shared import metrics_pb2, metrics_pb2_grpc from shared.config import get_gateway_config from shared.events import get_subscriber from shared.logging import setup_logging # Global state config = get_gateway_config() logger = setup_logging( service_name=config.service_name, log_level=config.log_level, log_format=config.log_format, ) # WebSocket connection manager class ConnectionManager: """Manages WebSocket connections for real-time updates.""" def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket) -> None: await websocket.accept() self.active_connections.append(websocket) logger.info("websocket_connected", total=len(self.active_connections)) def disconnect(self, websocket: WebSocket) -> None: self.active_connections.remove(websocket) logger.info("websocket_disconnected", total=len(self.active_connections)) async def broadcast(self, message: dict) -> None: """Broadcast message to all connected clients.""" if not self.active_connections: return data = json.dumps(message) disconnected = [] for connection in self.active_connections: try: await connection.send_text(data) except Exception: disconnected.append(connection) # Clean up disconnected for conn in disconnected: try: self.active_connections.remove(conn) except ValueError: pass manager = ConnectionManager() timescale: TimescaleStorage | None = None grpc_channel: grpc.aio.Channel | None = None grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None async def event_listener(): """Background task that listens for metric events and broadcasts to WebSocket clients.""" logger.info("event_listener_starting") async with get_subscriber(topics=["metrics.raw"]) as subscriber: async for event in subscriber.consume(): try: await manager.broadcast( { "type": "metrics", "data": event.payload, "timestamp": event.timestamp.isoformat(), } ) except Exception as e: logger.warning("broadcast_error", error=str(e)) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" global timescale, grpc_channel, grpc_stub logger.info("gateway_starting", port=config.http_port) # Connect to TimescaleDB for historical queries timescale = TimescaleStorage(config.timescale_url) try: await timescale.connect() except Exception as e: logger.warning("timescale_connection_failed", error=str(e)) timescale = None # Connect to aggregator via gRPC grpc_channel = grpc.aio.insecure_channel(config.aggregator_url) grpc_stub = metrics_pb2_grpc.MetricsServiceStub(grpc_channel) # Start event listener in background listener_task = asyncio.create_task(event_listener()) logger.info("gateway_started") yield # Cleanup listener_task.cancel() try: await listener_task except asyncio.CancelledError: pass if grpc_channel: await grpc_channel.close() if timescale: await timescale.disconnect() logger.info("gateway_stopped") # Create FastAPI app app = FastAPI( title="System Monitor Gateway", description="Real-time system monitoring dashboard", version="0.1.0", lifespan=lifespan, ) # Mount static files static_path = Path(__file__).parent.parent.parent / "web" / "static" if static_path.exists(): app.mount("/static", StaticFiles(directory=str(static_path)), name="static") # Templates templates_path = Path(__file__).parent.parent.parent / "web" / "templates" templates = ( Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None ) # ============================================================================ # Health endpoints # ============================================================================ @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "service": "gateway"} @app.get("/ready") async def readiness_check(): """Readiness check - verifies dependencies.""" checks = {"gateway": "ok"} # Check gRPC connection try: if grpc_stub: await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0) checks["aggregator"] = "ok" except Exception as e: checks["aggregator"] = f"error: {str(e)}" # Check TimescaleDB if timescale and timescale._pool: checks["timescaledb"] = "ok" else: checks["timescaledb"] = "not connected" return {"status": "ready", "checks": checks} # ============================================================================ # REST API endpoints # ============================================================================ @app.get("/api/machines") async def get_machines(): """Get current state of all machines.""" if not grpc_stub: raise HTTPException(status_code=503, detail="Aggregator not connected") try: response = await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=5.0) machines = [] for state in response.machines: metrics = {} for m in state.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value machines.append( { "machine_id": state.machine_id, "hostname": state.hostname, "last_seen_ms": state.last_seen_ms, "health": metrics_pb2.HealthStatus.Name(state.health), "metrics": metrics, } ) return {"machines": machines} except grpc.aio.AioRpcError as e: raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") @app.get("/api/machines/{machine_id}") async def get_machine(machine_id: str): """Get current state of a specific machine.""" if not grpc_stub: raise HTTPException(status_code=503, detail="Aggregator not connected") try: response = await grpc_stub.GetCurrentState( metrics_pb2.StateRequest(machine_id=machine_id), timeout=5.0, ) if not response.machine_id: raise HTTPException(status_code=404, detail="Machine not found") metrics = {} for m in response.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value return { "machine_id": response.machine_id, "hostname": response.hostname, "last_seen_ms": response.last_seen_ms, "health": metrics_pb2.HealthStatus.Name(response.health), "metrics": metrics, } except grpc.aio.AioRpcError as e: if e.code() == grpc.StatusCode.NOT_FOUND: raise HTTPException(status_code=404, detail="Machine not found") raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") @app.get("/api/metrics") async def get_metrics( machine_id: str | None = Query(None), metric_type: str | None = Query(None), minutes: int = Query(60, ge=1, le=1440), limit: int = Query(1000, ge=1, le=10000), ): """Get historical metrics.""" if not timescale: raise HTTPException(status_code=503, detail="TimescaleDB not connected") end_time = datetime.utcnow() start_time = end_time - timedelta(minutes=minutes) try: metrics = await timescale.get_metrics( machine_id=machine_id, metric_type=metric_type, start_time=start_time, end_time=end_time, limit=limit, ) return {"metrics": metrics, "count": len(metrics)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # WebSocket endpoint # ============================================================================ @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time metric updates.""" await manager.connect(websocket) try: # Send initial state if grpc_stub: try: response = await grpc_stub.GetAllStates( metrics_pb2.Empty(), timeout=5.0 ) for state in response.machines: metrics = {} for m in state.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value await websocket.send_json( { "type": "initial", "data": { "machine_id": state.machine_id, "hostname": state.hostname, "metrics": metrics, }, } ) except Exception as e: logger.warning("initial_state_error", error=str(e)) # Keep connection alive and handle incoming messages while True: try: data = await websocket.receive_text() # Handle ping/pong or commands from client if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: break finally: manager.disconnect(websocket) # ============================================================================ # Dashboard (HTML) # ============================================================================ @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request): """Serve the dashboard HTML.""" if templates: return templates.TemplateResponse("dashboard.html", {"request": request}) # Fallback if templates not found return HTMLResponse(""" System Monitor

System Monitor

Dashboard template not found. API endpoints:

Live Metrics

Connecting...
""") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=config.http_port)