"""Gateway service - FastAPI with WebSocket for real-time dashboard.""" import asyncio import json import sys from contextlib import asynccontextmanager from datetime import datetime, timedelta from pathlib import Path from typing import Any import grpc from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect from fastapi.requests import Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates # Add project root to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from services.aggregator.storage import TimescaleStorage from shared import metrics_pb2, metrics_pb2_grpc from shared.config import get_gateway_config from shared.events import get_subscriber from shared.logging import setup_logging # Global state config = get_gateway_config() logger = setup_logging( service_name=config.service_name, log_level=config.log_level, log_format=config.log_format, ) # Edge forwarding config EDGE_URL = config.edge_url if hasattr(config, "edge_url") else None EDGE_API_KEY = config.edge_api_key if hasattr(config, "edge_api_key") else "" edge_ws = None # WebSocket connection manager class ConnectionManager: """Manages WebSocket connections for real-time updates.""" def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket) -> None: await websocket.accept() self.active_connections.append(websocket) logger.info("websocket_connected", total=len(self.active_connections)) def disconnect(self, websocket: WebSocket) -> None: self.active_connections.remove(websocket) logger.info("websocket_disconnected", total=len(self.active_connections)) async def broadcast(self, message: dict) -> None: """Broadcast message to all connected clients.""" if not self.active_connections: return data = json.dumps(message) disconnected = [] for connection in self.active_connections: try: await connection.send_text(data) except Exception: disconnected.append(connection) # Clean up disconnected for conn in disconnected: try: self.active_connections.remove(conn) except ValueError: pass manager = ConnectionManager() internals_manager = ConnectionManager() # Separate manager for internals page timescale: TimescaleStorage | None = None grpc_channel: grpc.aio.Channel | None = None grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None async def connect_to_edge(): """Maintain persistent WebSocket connection to edge and forward metrics.""" global edge_ws if not EDGE_URL: logger.info("edge_not_configured", msg="No EDGE_URL set, running local only") return import websockets url = EDGE_URL if EDGE_API_KEY: separator = "&" if "?" in url else "?" url = f"{url}{separator}key={EDGE_API_KEY}" while True: try: logger.info("edge_connecting", url=EDGE_URL) async with websockets.connect(url) as ws: edge_ws = ws logger.info("edge_connected") while True: try: msg = await asyncio.wait_for(ws.recv(), timeout=30) # Ignore messages from edge (pings, etc) except asyncio.TimeoutError: await ws.ping() except asyncio.CancelledError: break except Exception as e: edge_ws = None logger.warning("edge_connection_error", error=str(e)) await asyncio.sleep(5) async def forward_to_edge(data: dict): """Forward metrics to edge if connected.""" global edge_ws if edge_ws: try: await edge_ws.send(json.dumps(data)) except Exception as e: logger.warning("edge_forward_error", error=str(e)) # Track recent events for internals view recent_events: list[dict] = [] MAX_RECENT_EVENTS = 100 service_stats = { "events_received": 0, "websocket_broadcasts": 0, "started_at": None, } # Cache of latest full metrics per machine (merges partial batches) machine_metrics_cache: dict[str, dict] = {} async def event_listener(): """Background task that listens for metric events and broadcasts to WebSocket clients.""" logger.info("event_listener_starting") async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber: async for event in subscriber.consume(): try: service_stats["events_received"] += 1 # Track for internals view event_record = { "id": event.event_id[:8], "topic": event.topic, "source": event.source, "timestamp": event.timestamp.isoformat(), "machine_id": event.payload.get("machine_id", ""), "metrics_count": len(event.payload.get("metrics", {})), } recent_events.insert(0, event_record) if len(recent_events) > MAX_RECENT_EVENTS: recent_events.pop() # Merge partial batch metrics into cache machine_id = event.payload.get("machine_id", "") incoming_metrics = event.payload.get("metrics", {}) if machine_id: if machine_id not in machine_metrics_cache: machine_metrics_cache[machine_id] = {} # Merge new metrics into existing (accumulate across batches) machine_metrics_cache[machine_id].update(incoming_metrics) # Build complete payload with merged metrics merged_payload = { "machine_id": machine_id, "hostname": event.payload.get("hostname", ""), "timestamp_ms": event.payload.get("timestamp_ms", 0), "metrics": machine_metrics_cache[machine_id], } else: merged_payload = event.payload # Broadcast merged data to dashboard broadcast_msg = { "type": "metrics", "data": merged_payload, "timestamp": event.timestamp.isoformat(), } await manager.broadcast(broadcast_msg) service_stats["websocket_broadcasts"] += 1 # Forward to edge if connected await forward_to_edge(broadcast_msg) # Broadcast to internals (show raw event, not merged) await internals_manager.broadcast( { "type": "event", "data": event_record, } ) except Exception as e: logger.warning("broadcast_error", error=str(e)) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" global timescale, grpc_channel, grpc_stub logger.info("gateway_starting", port=config.http_port) # Connect to TimescaleDB for historical queries timescale = TimescaleStorage(config.timescale_url) try: await timescale.connect() except Exception as e: logger.warning("timescale_connection_failed", error=str(e)) timescale = None # Connect to aggregator via gRPC grpc_channel = grpc.aio.insecure_channel(config.aggregator_url) grpc_stub = metrics_pb2_grpc.MetricsServiceStub(grpc_channel) # Start event listener in background listener_task = asyncio.create_task(event_listener()) # Start edge connection if configured edge_task = asyncio.create_task(connect_to_edge()) service_stats["started_at"] = datetime.utcnow().isoformat() logger.info("gateway_started") yield # Cleanup listener_task.cancel() edge_task.cancel() try: await listener_task except asyncio.CancelledError: pass try: await edge_task except asyncio.CancelledError: pass if grpc_channel: await grpc_channel.close() if timescale: await timescale.disconnect() logger.info("gateway_stopped") # Create FastAPI app app = FastAPI( title="System Monitor Gateway", description="Real-time system monitoring dashboard", version="0.1.0", lifespan=lifespan, ) # Mount static files static_path = Path(__file__).parent.parent.parent / "web" / "static" if static_path.exists(): app.mount("/static", StaticFiles(directory=str(static_path)), name="static") # Templates templates_path = Path(__file__).parent.parent.parent / "web" / "templates" templates = ( Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None ) # ============================================================================ # Health endpoints # ============================================================================ @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy", "service": "gateway"} @app.get("/ready") async def readiness_check(): """Readiness check - verifies dependencies.""" checks = {"gateway": "ok"} # Check gRPC connection try: if grpc_stub: await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0) checks["aggregator"] = "ok" except Exception as e: checks["aggregator"] = f"error: {str(e)}" # Check TimescaleDB if timescale and timescale._pool: checks["timescaledb"] = "ok" else: checks["timescaledb"] = "not connected" return {"status": "ready", "checks": checks} # ============================================================================ # REST API endpoints # ============================================================================ @app.get("/api/machines") async def get_machines(): """Get current state of all machines.""" if not grpc_stub: raise HTTPException(status_code=503, detail="Aggregator not connected") try: response = await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=5.0) machines = [] for state in response.machines: metrics = {} for m in state.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value machines.append( { "machine_id": state.machine_id, "hostname": state.hostname, "last_seen_ms": state.last_seen_ms, "health": metrics_pb2.HealthStatus.Name(state.health), "metrics": metrics, } ) return {"machines": machines} except grpc.aio.AioRpcError as e: raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") @app.get("/api/machines/{machine_id}") async def get_machine(machine_id: str): """Get current state of a specific machine.""" if not grpc_stub: raise HTTPException(status_code=503, detail="Aggregator not connected") try: response = await grpc_stub.GetCurrentState( metrics_pb2.StateRequest(machine_id=machine_id), timeout=5.0, ) if not response.machine_id: raise HTTPException(status_code=404, detail="Machine not found") metrics = {} for m in response.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value return { "machine_id": response.machine_id, "hostname": response.hostname, "last_seen_ms": response.last_seen_ms, "health": metrics_pb2.HealthStatus.Name(response.health), "metrics": metrics, } except grpc.aio.AioRpcError as e: if e.code() == grpc.StatusCode.NOT_FOUND: raise HTTPException(status_code=404, detail="Machine not found") raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") @app.get("/api/metrics") async def get_metrics( machine_id: str | None = Query(None), metric_type: str | None = Query(None), minutes: int = Query(60, ge=1, le=1440), limit: int = Query(1000, ge=1, le=10000), ): """Get historical metrics.""" if not timescale: raise HTTPException(status_code=503, detail="TimescaleDB not connected") end_time = datetime.utcnow() start_time = end_time - timedelta(minutes=minutes) try: metrics = await timescale.get_metrics( machine_id=machine_id, metric_type=metric_type, start_time=start_time, end_time=end_time, limit=limit, ) return {"metrics": metrics, "count": len(metrics)} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # WebSocket endpoint # ============================================================================ @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time metric updates.""" await manager.connect(websocket) try: # Send initial state if grpc_stub: try: response = await grpc_stub.GetAllStates( metrics_pb2.Empty(), timeout=5.0 ) for state in response.machines: metrics = {} for m in state.current_metrics: metric_type = metrics_pb2.MetricType.Name(m.type) metrics[metric_type] = m.value await websocket.send_json( { "type": "initial", "data": { "machine_id": state.machine_id, "hostname": state.hostname, "metrics": metrics, }, } ) except Exception as e: logger.warning("initial_state_error", error=str(e)) # Keep connection alive and handle incoming messages while True: try: data = await websocket.receive_text() # Handle ping/pong or commands from client if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: break finally: manager.disconnect(websocket) # ============================================================================ # Internals / Debug endpoints # ============================================================================ @app.get("/api/internals") async def get_internals(): """Get internal service stats and recent events.""" # Get service health services = { "gateway": {"status": "healthy", "started_at": service_stats["started_at"]}, "aggregator": {"status": "unknown"}, "redis": {"status": "unknown"}, "timescaledb": {"status": "unknown"}, } # Check aggregator try: if grpc_stub: await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0) services["aggregator"]["status"] = "healthy" except Exception as e: services["aggregator"]["status"] = f"error: {str(e)[:50]}" # Check TimescaleDB if timescale and timescale._pool: services["timescaledb"]["status"] = "healthy" else: services["timescaledb"]["status"] = "not connected" # Redis is healthy if we're receiving events if service_stats["events_received"] > 0: services["redis"]["status"] = "healthy" return { "stats": { "events_received": service_stats["events_received"], "websocket_broadcasts": service_stats["websocket_broadcasts"], "dashboard_connections": len(manager.active_connections), "internals_connections": len(internals_manager.active_connections), "cached_machines": len(machine_metrics_cache), }, "services": services, "recent_events": recent_events[:20], } @app.websocket("/ws/internals") async def internals_websocket(websocket: WebSocket): """WebSocket for real-time internals updates.""" await internals_manager.connect(websocket) try: # Send initial state await websocket.send_json( { "type": "init", "data": { "stats": { "events_received": service_stats["events_received"], "websocket_broadcasts": service_stats["websocket_broadcasts"], "cached_machines": len(machine_metrics_cache), }, "recent_events": recent_events[:20], }, } ) while True: try: data = await websocket.receive_text() if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: break finally: internals_manager.disconnect(websocket) # ============================================================================ # Dashboard (HTML) # ============================================================================ @app.get("/", response_class=HTMLResponse) async def dashboard(request: Request): """Serve the dashboard HTML.""" if templates: return templates.TemplateResponse("dashboard.html", {"request": request}) return HTMLResponse( "