new three layer deployment
This commit is contained in:
151
ctrl/hub/hub.py
Normal file
151
ctrl/hub/hub.py
Normal file
@@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
sysmonstm hub - Local aggregator that receives from collectors and forwards to edge.
|
||||
|
||||
Runs on the local network, receives metrics from collectors via WebSocket,
|
||||
and forwards them to the cloud edge.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
|
||||
|
||||
# Configuration
|
||||
API_KEY = os.environ.get("API_KEY", "")
|
||||
EDGE_URL = os.environ.get("EDGE_URL", "") # e.g., wss://sysmonstm.mcrn.ar/ws
|
||||
EDGE_API_KEY = os.environ.get("EDGE_API_KEY", "")
|
||||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger("hub")
|
||||
|
||||
app = FastAPI(title="sysmonstm-hub")
|
||||
|
||||
# State
|
||||
collector_connections: list[WebSocket] = []
|
||||
machines: dict = {}
|
||||
edge_ws = None
|
||||
|
||||
|
||||
async def connect_to_edge():
|
||||
"""Maintain persistent connection to edge and forward metrics."""
|
||||
global edge_ws
|
||||
|
||||
if not EDGE_URL:
|
||||
log.info("No EDGE_URL configured, running in local-only mode")
|
||||
return
|
||||
|
||||
import websockets
|
||||
|
||||
url = EDGE_URL
|
||||
if EDGE_API_KEY:
|
||||
separator = "&" if "?" in url else "?"
|
||||
url = f"{url}{separator}key={EDGE_API_KEY}"
|
||||
|
||||
while True:
|
||||
try:
|
||||
log.info(f"Connecting to edge: {EDGE_URL}")
|
||||
async with websockets.connect(url) as ws:
|
||||
edge_ws = ws
|
||||
log.info("Connected to edge")
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), timeout=30)
|
||||
# Ignore messages from edge (pings, etc)
|
||||
except asyncio.TimeoutError:
|
||||
await ws.ping()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
edge_ws = None
|
||||
log.warning(f"Edge connection error: {e}. Reconnecting in 5s...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def forward_to_edge(data: dict):
|
||||
"""Forward metrics to edge if connected."""
|
||||
global edge_ws
|
||||
if edge_ws:
|
||||
try:
|
||||
await edge_ws.send(json.dumps(data))
|
||||
log.debug(f"Forwarded to edge: {data.get('machine_id')}")
|
||||
except Exception as e:
|
||||
log.warning(f"Failed to forward to edge: {e}")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
asyncio.create_task(connect_to_edge())
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {
|
||||
"status": "ok",
|
||||
"machines": len(machines),
|
||||
"collectors": len(collector_connections),
|
||||
"edge_connected": edge_ws is not None,
|
||||
}
|
||||
|
||||
|
||||
@app.get("/api/machines")
|
||||
async def get_machines():
|
||||
return machines
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket, key: str = Query(default="")):
|
||||
# Validate API key
|
||||
if API_KEY and key != API_KEY:
|
||||
log.warning(f"Invalid API key from {websocket.client}")
|
||||
await websocket.close(code=4001, reason="Invalid API key")
|
||||
return
|
||||
|
||||
await websocket.accept()
|
||||
collector_connections.append(websocket)
|
||||
client = websocket.client.host if websocket.client else "unknown"
|
||||
log.info(f"Collector connected: {client}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
msg = await asyncio.wait_for(websocket.receive_text(), timeout=30)
|
||||
data = json.loads(msg)
|
||||
|
||||
if data.get("type") == "metrics":
|
||||
machine_id = data.get("machine_id", "unknown")
|
||||
machines[machine_id] = data
|
||||
log.debug(f"Metrics from {machine_id}: cpu={data.get('cpu')}%")
|
||||
|
||||
# Forward to edge
|
||||
await forward_to_edge(data)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
await websocket.send_json({"type": "ping"})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
log.info(f"Collector disconnected: {client}")
|
||||
except Exception as e:
|
||||
log.error(f"WebSocket error: {e}")
|
||||
finally:
|
||||
if websocket in collector_connections:
|
||||
collector_connections.remove(websocket)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
log.info("Starting sysmonstm hub")
|
||||
log.info(f" API key: {'configured' if API_KEY else 'not set (open)'}")
|
||||
log.info(f" Edge URL: {EDGE_URL or 'not configured (local only)'}")
|
||||
uvicorn.run(app, host="0.0.0.0", port=8080)
|
||||
Reference in New Issue
Block a user