From 82c4551e71b4ed5bbd235ba902c5b3c0d2163f27 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 22 Jan 2026 16:22:15 -0300 Subject: [PATCH] simple is better --- CLAUDE.md | 155 +++++++++++++----------------------- ctrl/README.md | 104 +++++++++++------------- ctrl/collector/Dockerfile | 16 ---- ctrl/collector/collector.py | 136 ------------------------------- ctrl/hub/Dockerfile | 16 ---- ctrl/hub/docker-compose.yml | 12 --- ctrl/hub/hub.py | 151 ----------------------------------- services/gateway/main.py | 77 ++++++++++++++++-- shared/config.py | 4 + 9 files changed, 178 insertions(+), 493 deletions(-) delete mode 100644 ctrl/collector/Dockerfile delete mode 100644 ctrl/collector/collector.py delete mode 100644 ctrl/hub/Dockerfile delete mode 100644 ctrl/hub/docker-compose.yml delete mode 100644 ctrl/hub/hub.py diff --git a/CLAUDE.md b/CLAUDE.md index 3e3e8cc..8853b52 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,131 +2,90 @@ ## Project Overview -A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture) while solving a real problem: monitoring development infrastructure across multiple machines. +A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture). -**Primary Goal:** Portfolio project demonstrating real-time streaming architecture -**Secondary Goal:** Actually useful tool for monitoring multi-machine development environment -**Status:** Working MVP, deployed at sysmonstm.mcrn.ar +**Primary Goal:** Portfolio project demonstrating real-time streaming with gRPC +**Status:** Working, deployed at sysmonstm.mcrn.ar -## Deployment Modes - -### Production (3-tier) +## Architecture ``` -┌─────────────┐ ┌─────────────┐ ┌─────────────┐ -│ Collector │────▶│ Hub │────▶│ Edge │ -│ (each host) │ │ (local) │ │ (AWS) │ -└─────────────┘ └─────────────┘ └─────────────┘ +┌─────────────┐ ┌─────────────────────────────────────┐ ┌─────────────┐ +│ Collector │────▶│ Aggregator + Gateway + Redis + TS │────▶│ Edge │────▶ Browser +│ (mcrn) │gRPC │ (LOCAL) │ WS │ (AWS) │ WS +└─────────────┘ └─────────────────────────────────────┘ └─────────────┘ +┌─────────────┐ │ +│ Collector │────────────────────┘ +│ (nfrt) │gRPC +└─────────────┘ ``` -- **Collector** (`ctrl/collector/`) - Lightweight agent on each monitored machine -- **Hub** (`ctrl/hub/`) - Local aggregator, receives from collectors, forwards to edge -- **Edge** (`ctrl/edge/`) - Cloud dashboard, public-facing - -### Development (Full Stack) - -```bash -docker compose up # Uses ctrl/dev/docker-compose.yml -``` - -- Full gRPC-based microservices architecture -- Services: aggregator, gateway, collector, alerts -- Storage: Redis (hot), TimescaleDB (historical) +- **Collectors** (`services/collector/`) - gRPC clients on each monitored machine +- **Aggregator** (`services/aggregator/`) - gRPC server, stores in Redis/TimescaleDB +- **Gateway** (`services/gateway/`) - FastAPI, bridges gRPC to WebSocket, forwards to edge +- **Edge** (`ctrl/edge/`) - Simple WebSocket relay for AWS, serves public dashboard ## Directory Structure ``` sms/ -├── services/ # gRPC-based microservices (dev stack) +├── services/ # gRPC-based microservices │ ├── collector/ # gRPC client, streams to aggregator │ ├── aggregator/ # gRPC server, stores in Redis/TimescaleDB -│ ├── gateway/ # FastAPI, bridges gRPC to WebSocket +│ ├── gateway/ # FastAPI, WebSocket, forwards to edge │ └── alerts/ # Event subscriber for threshold alerts │ ├── ctrl/ # Deployment configurations -│ ├── collector/ # Lightweight WebSocket collector -│ ├── hub/ # Local aggregator -│ ├── edge/ # Cloud dashboard -│ └── dev/ # Full stack docker-compose +│ ├── dev/ # Full stack docker-compose +│ └── edge/ # Cloud dashboard (AWS) │ ├── proto/ # Protocol Buffer definitions -├── shared/ # Shared Python modules -├── web/ # Dashboard templates and static files -├── infra/ # Terraform for AWS deployment -└── k8s/ # Kubernetes manifests +├── shared/ # Shared Python modules (config, logging, events) +└── web/ # Dashboard templates and static files ``` -## Current Setup +## Running -**Machines being monitored:** -- `mcrn` - Primary workstation (runs hub + collector) -- `nfrt` - Secondary machine (runs collector only) - -**Topology:** +### Local Development +```bash +docker compose up ``` -mcrn nfrt AWS -├── hub ◄─────────────────── collector edge (sysmonstm.mcrn.ar) -│ │ ▲ -│ └────────────────────────────────────────────────┘ -└── collector + +### With Edge Forwarding (to AWS) +```bash +EDGE_URL=wss://sysmonstm.mcrn.ar/ws docker compose up +``` + +### Collector on Remote Machine +```bash +docker run -d --network host \ + -e AGGREGATOR_URL=:50051 \ + -e MACHINE_ID=$(hostname) \ + registry.mcrn.ar/sysmonstm/collector:latest ``` ## Technical Stack -### Core Technologies -- **Python 3.11+** - Primary language -- **FastAPI** - Web gateway, REST endpoints, WebSocket streaming -- **gRPC** - Inter-service communication (dev stack) -- **WebSockets** - Production deployment communication -- **psutil** - System metrics collection +- **Python 3.11+** +- **gRPC** - Collector to aggregator communication (showcased) +- **FastAPI** - Gateway REST/WebSocket +- **Redis** - Pub/Sub events, current state cache +- **TimescaleDB** - Historical metrics storage +- **WebSocket** - Gateway to edge, edge to browser -### Storage (Dev Stack Only) -- **PostgreSQL/TimescaleDB** - Time-series historical data -- **Redis** - Current state, caching, event pub/sub +## Key Files -### Infrastructure -- **Docker Compose** - Orchestration -- **Woodpecker CI** - Build pipeline at ppl/pipelines/sysmonstm/ -- **Registry** - registry.mcrn.ar/sysmonstm/ +| File | Purpose | +|------|---------| +| `proto/metrics.proto` | gRPC service and message definitions | +| `services/collector/main.py` | gRPC streaming client | +| `services/aggregator/main.py` | gRPC server, metric processing | +| `services/gateway/main.py` | WebSocket bridge, edge forwarding | +| `ctrl/edge/edge.py` | Simple WebSocket relay for AWS | -## Images +## Portfolio Talking Points -| Image | Purpose | -|-------|---------| -| `collector` | Lightweight WebSocket collector for production | -| `hub` | Local aggregator for production | -| `edge` | Cloud dashboard for production | -| `aggregator` | gRPC aggregator (dev stack) | -| `gateway` | FastAPI gateway (dev stack) | -| `collector-grpc` | gRPC collector (dev stack) | -| `alerts` | Alert service (dev stack) | - -## Development Guidelines - -### Code Quality -- Type hints throughout (Python 3.11+ syntax) -- Async/await patterns consistently -- Logging (not print statements) -- Error handling at boundaries - -### Docker -- Multi-stage builds for smaller images -- `--network host` for collectors (accurate network metrics) - -### Configuration -- Environment variables for all config -- Sensible defaults -- No secrets in code - -## Interview/Portfolio Talking Points - -### Architecture Decisions -- "3-tier for production: collector → hub → edge" -- "Hub allows local aggregation and buffering before forwarding to cloud" -- "Edge terminology shows awareness of edge computing patterns" -- "Full gRPC stack for development demonstrates microservices patterns" - -### Trade-offs -- Production vs Dev: simplicity/cost vs full architecture demo -- WebSocket vs gRPC: browser compatibility vs efficiency -- In-memory vs persistent: operational simplicity vs durability +- **gRPC streaming** - Efficient binary protocol for real-time metrics +- **Event-driven** - Redis Pub/Sub decouples processing from delivery +- **Edge pattern** - Heavy processing local, lightweight relay in cloud +- **Cost optimization** - ~$10/mo for public dashboard (data transfer, not requests) diff --git a/ctrl/README.md b/ctrl/README.md index b999053..f7bde1f 100644 --- a/ctrl/README.md +++ b/ctrl/README.md @@ -1,82 +1,72 @@ # Deployment Configurations -This directory contains deployment configurations for sysmonstm. - ## Architecture ``` -┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ -│ Collector │────▶│ Hub │────▶│ Edge │────▶│ Browser │ -│ (mcrn) │ │ (local) │ │ (AWS) │ │ │ -└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ -┌─────────────┐ │ -│ Collector │────────────┘ -│ (nfrt) │ +┌─────────────┐ ┌─────────────────────────────────────┐ ┌─────────────┐ +│ Collector │────▶│ Aggregator + Gateway + Redis + TS │────▶│ Edge │────▶ Browser +│ (mcrn) │gRPC │ (LOCAL) │ WS │ (AWS) │ WS +└─────────────┘ └─────────────────────────────────────┘ └─────────────┘ +┌─────────────┐ │ +│ Collector │────────────────────┘ +│ (nfrt) │gRPC └─────────────┘ ``` +- **Collectors** use gRPC to stream metrics to the local aggregator +- **Gateway** forwards to edge via WebSocket (if `EDGE_URL` configured) +- **Edge** (AWS) relays to browsers via WebSocket + ## Directory Structure ``` ctrl/ -├── collector/ # Lightweight agent for each monitored machine -├── hub/ # Local aggregator (receives from collectors, forwards to edge) -├── edge/ # Cloud dashboard (public-facing, receives from hub) -└── dev/ # Full gRPC stack for development +├── dev/ # Full stack for local development (docker-compose) +└── edge/ # Cloud dashboard for AWS deployment ``` -## Production Deployment (3-tier) - -### 1. Edge (AWS) -Public-facing dashboard that receives metrics from hub. - -```bash -cd ctrl/edge -docker compose up -d -``` - -### 2. Hub (Local Server) -Runs on your local network, receives from collectors, forwards to edge. - -```bash -cd ctrl/hub -EDGE_URL=wss://sysmonstm.mcrn.ar/ws EDGE_API_KEY=xxx docker compose up -d -``` - -### 3. Collectors (Each Machine) -Run on each machine you want to monitor. - -```bash -docker run -d --name sysmonstm-collector --network host \ - -e HUB_URL=ws://hub-machine:8080/ws \ - -e MACHINE_ID=$(hostname) \ - -e API_KEY=xxx \ - registry.mcrn.ar/sysmonstm/collector:latest -``` - -## Development (Full Stack) - -For local development with the complete gRPC-based architecture: +## Local Development ```bash # From repo root docker compose up ``` -This runs: aggregator, gateway, collector, alerts, redis, timescaledb +Runs: aggregator, gateway, collector, alerts, redis, timescaledb + +## Production Deployment + +### 1. Deploy Edge to AWS + +```bash +cd ctrl/edge +docker compose up -d +``` + +### 2. Run Full Stack Locally with Edge Forwarding + +```bash +EDGE_URL=wss://sysmonstm.mcrn.ar/ws EDGE_API_KEY=xxx docker compose up +``` + +### 3. Run Collectors on Other Machines + +```bash +docker run -d --name sysmonstm-collector --network host \ + -e AGGREGATOR_URL=:50051 \ + -e MACHINE_ID=$(hostname) \ + registry.mcrn.ar/sysmonstm/collector:latest +``` ## Environment Variables -### Collector -- `HUB_URL` - WebSocket URL of hub (default: ws://localhost:8080/ws) -- `MACHINE_ID` - Identifier for this machine (default: hostname) -- `API_KEY` - Authentication key -- `INTERVAL` - Seconds between collections (default: 5) - -### Hub -- `API_KEY` - Key required from collectors -- `EDGE_URL` - WebSocket URL of edge (optional, for forwarding) -- `EDGE_API_KEY` - Key for authenticating to edge +### Gateway (for edge forwarding) +- `EDGE_URL` - WebSocket URL of edge (e.g., wss://sysmonstm.mcrn.ar/ws) +- `EDGE_API_KEY` - Authentication key for edge ### Edge -- `API_KEY` - Key required from hub +- `API_KEY` - Key required from gateway + +### Collector +- `AGGREGATOR_URL` - gRPC URL of aggregator (e.g., localhost:50051) +- `MACHINE_ID` - Identifier for this machine diff --git a/ctrl/collector/Dockerfile b/ctrl/collector/Dockerfile deleted file mode 100644 index d9556c8..0000000 --- a/ctrl/collector/Dockerfile +++ /dev/null @@ -1,16 +0,0 @@ -FROM python:3.11-slim - -WORKDIR /app - -RUN pip install --no-cache-dir psutil websockets - -COPY collector.py . - -# Default environment variables -ENV HUB_URL=ws://localhost:8080/ws -ENV MACHINE_ID="" -ENV API_KEY="" -ENV INTERVAL=5 -ENV LOG_LEVEL=INFO - -CMD ["python", "collector.py"] diff --git a/ctrl/collector/collector.py b/ctrl/collector/collector.py deleted file mode 100644 index 68a10a9..0000000 --- a/ctrl/collector/collector.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python3 -"""Lightweight WebSocket metrics collector for sysmonstm standalone deployment.""" - -import asyncio -import json -import logging -import os -import socket -import time - -import psutil - -# Configuration from environment -HUB_URL = os.environ.get("HUB_URL", "ws://localhost:8080/ws") -MACHINE_ID = os.environ.get("MACHINE_ID", socket.gethostname()) -API_KEY = os.environ.get("API_KEY", "") -INTERVAL = int(os.environ.get("INTERVAL", "5")) -LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") - -# Logging setup -logging.basicConfig( - level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) -log = logging.getLogger("collector") - - -def collect_metrics() -> dict: - """Collect system metrics using psutil.""" - metrics = { - "type": "metrics", - "machine_id": MACHINE_ID, - "hostname": socket.gethostname(), - "timestamp": time.time(), - } - - # CPU - try: - metrics["cpu"] = psutil.cpu_percent(interval=None) - except Exception: - pass - - # Memory - try: - mem = psutil.virtual_memory() - metrics["memory"] = mem.percent - metrics["memory_used_gb"] = round(mem.used / (1024**3), 2) - metrics["memory_total_gb"] = round(mem.total / (1024**3), 2) - except Exception: - pass - - # Disk - try: - disk = psutil.disk_usage("/") - metrics["disk"] = disk.percent - metrics["disk_used_gb"] = round(disk.used / (1024**3), 2) - metrics["disk_total_gb"] = round(disk.total / (1024**3), 2) - except Exception: - pass - - # Load average (Unix only) - try: - load1, load5, load15 = psutil.getloadavg() - metrics["load_1m"] = round(load1, 2) - metrics["load_5m"] = round(load5, 2) - metrics["load_15m"] = round(load15, 2) - except (AttributeError, OSError): - pass - - # Network connections count - try: - metrics["connections"] = len(psutil.net_connections(kind="inet")) - except (psutil.AccessDenied, PermissionError): - pass - - # Process count - try: - metrics["processes"] = len(psutil.pids()) - except Exception: - pass - - return metrics - - -async def run_collector(): - """Main collector loop with auto-reconnect.""" - import websockets - - # Build URL with API key if provided - url = HUB_URL - if API_KEY: - separator = "&" if "?" in url else "?" - url = f"{url}{separator}key={API_KEY}" - - # Prime CPU percent (first call always returns 0) - psutil.cpu_percent(interval=None) - - while True: - try: - log.info(f"Connecting to {HUB_URL}...") - async with websockets.connect(url) as ws: - log.info( - f"Connected. Sending metrics every {INTERVAL}s as '{MACHINE_ID}'" - ) - - while True: - metrics = collect_metrics() - await ws.send(json.dumps(metrics)) - log.debug( - f"Sent: cpu={metrics.get('cpu', '?')}% mem={metrics.get('memory', '?')}% disk={metrics.get('disk', '?')}%" - ) - await asyncio.sleep(INTERVAL) - - except asyncio.CancelledError: - log.info("Collector stopped") - break - except Exception as e: - log.warning(f"Connection error: {e}. Reconnecting in 5s...") - await asyncio.sleep(5) - - -def main(): - log.info("sysmonstm collector starting") - log.info(f" Hub: {HUB_URL}") - log.info(f" Machine: {MACHINE_ID}") - log.info(f" Interval: {INTERVAL}s") - - try: - asyncio.run(run_collector()) - except KeyboardInterrupt: - log.info("Stopped") - - -if __name__ == "__main__": - main() diff --git a/ctrl/hub/Dockerfile b/ctrl/hub/Dockerfile deleted file mode 100644 index 2488bb5..0000000 --- a/ctrl/hub/Dockerfile +++ /dev/null @@ -1,16 +0,0 @@ -FROM python:3.11-slim - -WORKDIR /app - -RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets - -COPY hub.py . - -ENV API_KEY="" -ENV EDGE_URL="" -ENV EDGE_API_KEY="" -ENV LOG_LEVEL=INFO - -EXPOSE 8080 - -CMD ["uvicorn", "hub:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/ctrl/hub/docker-compose.yml b/ctrl/hub/docker-compose.yml deleted file mode 100644 index 3ab3aad..0000000 --- a/ctrl/hub/docker-compose.yml +++ /dev/null @@ -1,12 +0,0 @@ -services: - hub: - build: . - container_name: sysmonstm-hub - restart: unless-stopped - environment: - - API_KEY=${API_KEY:-} - - EDGE_URL=${EDGE_URL:-} - - EDGE_API_KEY=${EDGE_API_KEY:-} - - LOG_LEVEL=${LOG_LEVEL:-INFO} - ports: - - "8080:8080" diff --git a/ctrl/hub/hub.py b/ctrl/hub/hub.py deleted file mode 100644 index 39e7afd..0000000 --- a/ctrl/hub/hub.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python3 -""" -sysmonstm hub - Local aggregator that receives from collectors and forwards to edge. - -Runs on the local network, receives metrics from collectors via WebSocket, -and forwards them to the cloud edge. -""" - -import asyncio -import json -import logging -import os - -from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect - -# Configuration -API_KEY = os.environ.get("API_KEY", "") -EDGE_URL = os.environ.get("EDGE_URL", "") # e.g., wss://sysmonstm.mcrn.ar/ws -EDGE_API_KEY = os.environ.get("EDGE_API_KEY", "") -LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") - -# Logging setup -logging.basicConfig( - level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), - format="%(asctime)s [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) -log = logging.getLogger("hub") - -app = FastAPI(title="sysmonstm-hub") - -# State -collector_connections: list[WebSocket] = [] -machines: dict = {} -edge_ws = None - - -async def connect_to_edge(): - """Maintain persistent connection to edge and forward metrics.""" - global edge_ws - - if not EDGE_URL: - log.info("No EDGE_URL configured, running in local-only mode") - return - - import websockets - - url = EDGE_URL - if EDGE_API_KEY: - separator = "&" if "?" in url else "?" - url = f"{url}{separator}key={EDGE_API_KEY}" - - while True: - try: - log.info(f"Connecting to edge: {EDGE_URL}") - async with websockets.connect(url) as ws: - edge_ws = ws - log.info("Connected to edge") - - while True: - try: - msg = await asyncio.wait_for(ws.recv(), timeout=30) - # Ignore messages from edge (pings, etc) - except asyncio.TimeoutError: - await ws.ping() - - except asyncio.CancelledError: - break - except Exception as e: - edge_ws = None - log.warning(f"Edge connection error: {e}. Reconnecting in 5s...") - await asyncio.sleep(5) - - -async def forward_to_edge(data: dict): - """Forward metrics to edge if connected.""" - global edge_ws - if edge_ws: - try: - await edge_ws.send(json.dumps(data)) - log.debug(f"Forwarded to edge: {data.get('machine_id')}") - except Exception as e: - log.warning(f"Failed to forward to edge: {e}") - - -@app.on_event("startup") -async def startup(): - asyncio.create_task(connect_to_edge()) - - -@app.get("/health") -async def health(): - return { - "status": "ok", - "machines": len(machines), - "collectors": len(collector_connections), - "edge_connected": edge_ws is not None, - } - - -@app.get("/api/machines") -async def get_machines(): - return machines - - -@app.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket, key: str = Query(default="")): - # Validate API key - if API_KEY and key != API_KEY: - log.warning(f"Invalid API key from {websocket.client}") - await websocket.close(code=4001, reason="Invalid API key") - return - - await websocket.accept() - collector_connections.append(websocket) - client = websocket.client.host if websocket.client else "unknown" - log.info(f"Collector connected: {client}") - - try: - while True: - try: - msg = await asyncio.wait_for(websocket.receive_text(), timeout=30) - data = json.loads(msg) - - if data.get("type") == "metrics": - machine_id = data.get("machine_id", "unknown") - machines[machine_id] = data - log.debug(f"Metrics from {machine_id}: cpu={data.get('cpu')}%") - - # Forward to edge - await forward_to_edge(data) - - except asyncio.TimeoutError: - await websocket.send_json({"type": "ping"}) - - except WebSocketDisconnect: - log.info(f"Collector disconnected: {client}") - except Exception as e: - log.error(f"WebSocket error: {e}") - finally: - if websocket in collector_connections: - collector_connections.remove(websocket) - - -if __name__ == "__main__": - import uvicorn - - log.info("Starting sysmonstm hub") - log.info(f" API key: {'configured' if API_KEY else 'not set (open)'}") - log.info(f" Edge URL: {EDGE_URL or 'not configured (local only)'}") - uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/services/gateway/main.py b/services/gateway/main.py index dbcf2dc..9af841b 100644 --- a/services/gateway/main.py +++ b/services/gateway/main.py @@ -32,6 +32,11 @@ logger = setup_logging( log_format=config.log_format, ) +# Edge forwarding config +EDGE_URL = config.edge_url if hasattr(config, "edge_url") else None +EDGE_API_KEY = config.edge_api_key if hasattr(config, "edge_api_key") else "" +edge_ws = None + # WebSocket connection manager class ConnectionManager: @@ -77,6 +82,54 @@ timescale: TimescaleStorage | None = None grpc_channel: grpc.aio.Channel | None = None grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None + +async def connect_to_edge(): + """Maintain persistent WebSocket connection to edge and forward metrics.""" + global edge_ws + + if not EDGE_URL: + logger.info("edge_not_configured", msg="No EDGE_URL set, running local only") + return + + import websockets + + url = EDGE_URL + if EDGE_API_KEY: + separator = "&" if "?" in url else "?" + url = f"{url}{separator}key={EDGE_API_KEY}" + + while True: + try: + logger.info("edge_connecting", url=EDGE_URL) + async with websockets.connect(url) as ws: + edge_ws = ws + logger.info("edge_connected") + + while True: + try: + msg = await asyncio.wait_for(ws.recv(), timeout=30) + # Ignore messages from edge (pings, etc) + except asyncio.TimeoutError: + await ws.ping() + + except asyncio.CancelledError: + break + except Exception as e: + edge_ws = None + logger.warning("edge_connection_error", error=str(e)) + await asyncio.sleep(5) + + +async def forward_to_edge(data: dict): + """Forward metrics to edge if connected.""" + global edge_ws + if edge_ws: + try: + await edge_ws.send(json.dumps(data)) + except Exception as e: + logger.warning("edge_forward_error", error=str(e)) + + # Track recent events for internals view recent_events: list[dict] = [] MAX_RECENT_EVENTS = 100 @@ -133,15 +186,17 @@ async def event_listener(): merged_payload = event.payload # Broadcast merged data to dashboard - await manager.broadcast( - { - "type": "metrics", - "data": merged_payload, - "timestamp": event.timestamp.isoformat(), - } - ) + broadcast_msg = { + "type": "metrics", + "data": merged_payload, + "timestamp": event.timestamp.isoformat(), + } + await manager.broadcast(broadcast_msg) service_stats["websocket_broadcasts"] += 1 + # Forward to edge if connected + await forward_to_edge(broadcast_msg) + # Broadcast to internals (show raw event, not merged) await internals_manager.broadcast( { @@ -176,6 +231,9 @@ async def lifespan(app: FastAPI): # Start event listener in background listener_task = asyncio.create_task(event_listener()) + # Start edge connection if configured + edge_task = asyncio.create_task(connect_to_edge()) + service_stats["started_at"] = datetime.utcnow().isoformat() logger.info("gateway_started") @@ -183,10 +241,15 @@ async def lifespan(app: FastAPI): # Cleanup listener_task.cancel() + edge_task.cancel() try: await listener_task except asyncio.CancelledError: pass + try: + await edge_task + except asyncio.CancelledError: + pass if grpc_channel: await grpc_channel.close() diff --git a/shared/config.py b/shared/config.py index 203b752..3b7e881 100644 --- a/shared/config.py +++ b/shared/config.py @@ -74,6 +74,10 @@ class GatewayConfig(BaseConfig): # TimescaleDB - can be set directly via TIMESCALE_URL timescale_url: str = "postgresql://monitor:monitor@localhost:5432/monitor" + # Edge forwarding (optional - for pushing to cloud edge) + edge_url: str = "" # e.g., wss://sysmonstm.mcrn.ar/ws + edge_api_key: str = "" + class AlertsConfig(BaseConfig): """Alerts service configuration."""