From dc3518f138b3e2cd106620ca71a6d3bfeaa247b5 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Thu, 22 Jan 2026 12:55:50 -0300 Subject: [PATCH] new three layer deployment --- CLAUDE.md | 558 ++++-------------- ctrl/README.md | 82 +++ ctrl/collector/Dockerfile | 16 + ctrl/collector/collector.py | 136 +++++ .../dev/docker-compose.override.yml | 0 ctrl/dev/docker-compose.yml | 154 +++++ ctrl/edge/Dockerfile | 14 + ctrl/{standalone => edge}/README.md | 0 ctrl/{standalone => edge}/docker-compose.yml | 7 +- ctrl/{standalone/main.py => edge/edge.py} | 102 +++- ctrl/hub/Dockerfile | 16 + ctrl/hub/docker-compose.yml | 12 + ctrl/hub/hub.py | 151 +++++ ctrl/standalone/Dockerfile | 6 - docker-compose.yml | 155 +---- 15 files changed, 766 insertions(+), 643 deletions(-) create mode 100644 ctrl/README.md create mode 100644 ctrl/collector/Dockerfile create mode 100644 ctrl/collector/collector.py rename docker-compose.override.yml => ctrl/dev/docker-compose.override.yml (100%) create mode 100644 ctrl/dev/docker-compose.yml create mode 100644 ctrl/edge/Dockerfile rename ctrl/{standalone => edge}/README.md (100%) rename ctrl/{standalone => edge}/docker-compose.yml (54%) rename ctrl/{standalone/main.py => edge/edge.py} (66%) create mode 100644 ctrl/hub/Dockerfile create mode 100644 ctrl/hub/docker-compose.yml create mode 100644 ctrl/hub/hub.py delete mode 100644 ctrl/standalone/Dockerfile mode change 100644 => 120000 docker-compose.yml diff --git a/CLAUDE.md b/CLAUDE.md index 1f78792..3e3e8cc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,489 +4,129 @@ A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture) while solving a real problem: monitoring development infrastructure across multiple machines. -**Primary Goal:** Interview demonstration project for Python Microservices Engineer position -**Secondary Goal:** Actually useful tool for managing multi-machine development environment -**Time Investment:** Phased approach - MVP in weekend, polish over 2-3 weeks +**Primary Goal:** Portfolio project demonstrating real-time streaming architecture +**Secondary Goal:** Actually useful tool for monitoring multi-machine development environment +**Status:** Working MVP, deployed at sysmonstm.mcrn.ar -## Why This Project +## Deployment Modes -**Interview Alignment:** -- Demonstrates gRPC-based microservices architecture (core requirement) -- Shows streaming patterns (server-side and bidirectional) -- Real-time data aggregation and processing -- Alert/threshold monitoring (maps to fraud detection) -- Event-driven patterns -- Multiple data sources requiring normalization (maps to multiple payment processors) +### Production (3-tier) -**Personal Utility:** -- Monitors existing multi-machine dev setup -- Dashboard stays open, provides real value -- Solves actual pain point -- Will continue running post-interview +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Collector │────▶│ Hub │────▶│ Edge │ +│ (each host) │ │ (local) │ │ (AWS) │ +└─────────────┘ └─────────────┘ └─────────────┘ +``` -**Domain Mapping for Interview:** -- Machine = Payment Processor -- Metrics Stream = Transaction Stream -- Resource Thresholds = Fraud/Limit Detection -- Alert System = Risk Management -- Aggregation Service = Payment Processing Hub +- **Collector** (`ctrl/collector/`) - Lightweight agent on each monitored machine +- **Hub** (`ctrl/hub/`) - Local aggregator, receives from collectors, forwards to edge +- **Edge** (`ctrl/edge/`) - Cloud dashboard, public-facing + +### Development (Full Stack) + +```bash +docker compose up # Uses ctrl/dev/docker-compose.yml +``` + +- Full gRPC-based microservices architecture +- Services: aggregator, gateway, collector, alerts +- Storage: Redis (hot), TimescaleDB (historical) + +## Directory Structure + +``` +sms/ +├── services/ # gRPC-based microservices (dev stack) +│ ├── collector/ # gRPC client, streams to aggregator +│ ├── aggregator/ # gRPC server, stores in Redis/TimescaleDB +│ ├── gateway/ # FastAPI, bridges gRPC to WebSocket +│ └── alerts/ # Event subscriber for threshold alerts +│ +├── ctrl/ # Deployment configurations +│ ├── collector/ # Lightweight WebSocket collector +│ ├── hub/ # Local aggregator +│ ├── edge/ # Cloud dashboard +│ └── dev/ # Full stack docker-compose +│ +├── proto/ # Protocol Buffer definitions +├── shared/ # Shared Python modules +├── web/ # Dashboard templates and static files +├── infra/ # Terraform for AWS deployment +└── k8s/ # Kubernetes manifests +``` + +## Current Setup + +**Machines being monitored:** +- `mcrn` - Primary workstation (runs hub + collector) +- `nfrt` - Secondary machine (runs collector only) + +**Topology:** +``` +mcrn nfrt AWS +├── hub ◄─────────────────── collector edge (sysmonstm.mcrn.ar) +│ │ ▲ +│ └────────────────────────────────────────────────┘ +└── collector +``` ## Technical Stack -### Core Technologies (Must Use - From JD) +### Core Technologies - **Python 3.11+** - Primary language - **FastAPI** - Web gateway, REST endpoints, WebSocket streaming -- **gRPC** - Inter-service communication, metric streaming -- **PostgreSQL/TimescaleDB** - Time-series historical data -- **Redis** - Current state, caching, alert rules -- **Docker Compose** - Orchestration - -### Supporting Technologies -- **Protocol Buffers** - gRPC message definitions -- **WebSockets** - Browser streaming -- **htmx + Alpine.js** - Lightweight reactive frontend (avoid heavy SPA) -- **Chart.js or Apache ECharts** - Real-time graphs -- **asyncio** - Async patterns throughout - -### Development Tools -- **grpcio & grpcio-tools** - Python gRPC +- **gRPC** - Inter-service communication (dev stack) +- **WebSockets** - Production deployment communication - **psutil** - System metrics collection -- **uvicorn** - FastAPI server -- **pytest** - Testing -- **docker-compose** - Local orchestration -## Architecture +### Storage (Dev Stack Only) +- **PostgreSQL/TimescaleDB** - Time-series historical data +- **Redis** - Current state, caching, event pub/sub -``` -┌─────────────────────────────────────────────────────────────┐ -│ Browser │ -│ ┌──────────────────────────────────────────────────────┐ │ -│ │ Dashboard (htmx + Alpine.js + WebSockets) │ │ -│ └──────────────────────────────────────────────────────┘ │ -└────────────────────────┬────────────────────────────────────┘ - │ WebSocket - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Web Gateway Service │ -│ (FastAPI + WebSockets) │ -│ - Serves dashboard │ -│ - Streams updates to browser │ -│ - REST API for historical queries │ -└────────────────────────┬────────────────────────────────────┘ - │ gRPC - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Aggregator Service (gRPC) │ -│ - Receives metric streams from all collectors │ -│ - Normalizes data from different sources │ -│ - Enriches with machine context │ -│ - Publishes to event stream │ -│ - Checks alert thresholds │ -└─────┬───────────────────────────────────┬───────────────────┘ - │ │ - │ Stores │ Publishes events - ▼ ▼ -┌──────────────┐ ┌────────────────┐ -│ TimescaleDB │ │ Event Stream │ -│ (historical)│ │ (Redis Pub/Sub│ -└──────────────┘ │ or RabbitMQ) │ - └────────┬───────┘ -┌──────────────┐ │ -│ Redis │ │ Subscribes -│ (current │◄───────────────────────────┘ -│ state) │ │ -└──────────────┘ ▼ - ┌────────────────┐ - ▲ │ Alert Service │ - │ │ - Processes │ - │ │ events │ - │ gRPC Streaming │ - Triggers │ - │ │ actions │ -┌─────┴────────────────────────────┴────────────────┘ -│ -│ Multiple Collector Services (one per machine) -│ ┌───────────────────────────────────────┐ -│ │ Metrics Collector (gRPC Client) │ -│ │ - Gathers system metrics (psutil) │ -│ │ - Streams to Aggregator via gRPC │ -│ │ - CPU, Memory, Disk, Network │ -│ │ - Process list │ -│ │ - Docker container stats (optional) │ -│ └───────────────────────────────────────┘ -│ -└──► Machine 1, Machine 2, Machine 3, ... -``` +### Infrastructure +- **Docker Compose** - Orchestration +- **Woodpecker CI** - Build pipeline at ppl/pipelines/sysmonstm/ +- **Registry** - registry.mcrn.ar/sysmonstm/ -## Implementation Phases +## Images -### Phase 1: MVP - Core Streaming (Weekend - 8-12 hours) - -**Goal:** Prove the gRPC streaming works end-to-end - -**Deliverables:** -1. Metrics Collector Service (gRPC client) - - Collects CPU, memory, disk on localhost - - Streams to aggregator every 5 seconds - -2. Aggregator Service (gRPC server) - - Receives metric stream - - Stores current state in Redis - - Logs to console - -3. Proto definitions for metric messages - -4. Docker Compose setup - -**Success Criteria:** -- Run collector, see metrics flowing to aggregator -- Redis contains current state -- Can query Redis manually for latest metrics - -### Phase 2: Web Dashboard (1 week) - -**Goal:** Make it visible and useful - -**Deliverables:** -1. Web Gateway Service (FastAPI) - - WebSocket endpoint for streaming - - REST endpoints for current/historical data - -2. Dashboard UI - - Real-time CPU/Memory graphs per machine - - Current state table - - Simple, clean design - -3. WebSocket bridge (Gateway ↔ Aggregator) - -4. TimescaleDB integration - - Store historical metrics - - Query endpoints for time ranges - -**Success Criteria:** -- Open dashboard, see live graphs updating -- Graphs show last hour of data -- Multiple machines displayed separately - -### Phase 3: Alerts & Intelligence (1 week) - -**Goal:** Add decision-making layer (interview focus) - -**Deliverables:** -1. Alert Service - - Subscribes to event stream - - Evaluates threshold rules - - Triggers notifications - -2. Configuration Service (gRPC) - - Dynamic threshold management - - Alert rule CRUD - - Stored in PostgreSQL - -3. Event Stream implementation (Redis Pub/Sub or RabbitMQ) - -4. Enhanced dashboard - - Alert indicators - - Alert history - - Threshold configuration UI - -**Success Criteria:** -- Set CPU threshold at 80% -- Generate load (stress-ng) -- See alert trigger in dashboard -- Alert logged to database - -### Phase 4: Interview Polish (Final week) - -**Goal:** Demo-ready, production patterns visible - -**Deliverables:** -1. Observability - - OpenTelemetry tracing (optional) - - Structured logging - - Health check endpoints - -2. "Synthetic Transactions" - - Simulate business operations through system - - Track end-to-end latency - - Maps directly to payment processing demo - -3. Documentation - - Architecture diagram - - Service interaction flows - - Deployment guide - -4. Demo script - - Story to walk through - - Key talking points - - Domain mapping explanations - -**Success Criteria:** -- Can deploy entire stack with one command -- Can explain every service's role -- Can map architecture to payment processing -- Demo runs smoothly without hiccups - -## Key Technical Patterns to Demonstrate - -### 1. gRPC Streaming Patterns - -**Server-Side Streaming:** -```python -# Collector streams metrics to aggregator -service MetricsService { - rpc StreamMetrics(MetricsRequest) returns (stream Metric) {} -} -``` - -**Bidirectional Streaming:** -```python -# Two-way communication between services -service ControlService { - rpc ManageStream(stream Command) returns (stream Response) {} -} -``` - -### 2. Service Communication Patterns - -- **Synchronous (gRPC):** Query current state, configuration -- **Asynchronous (Events):** Metric updates, alerts, audit logs -- **Streaming (gRPC + WebSocket):** Real-time data flow - -### 3. Data Storage Patterns - -- **Hot data (Redis):** Current state, recent metrics (last 5 minutes) -- **Warm data (TimescaleDB):** Historical metrics (last 30 days) -- **Cold data (Optional):** Archive to S3-compatible storage - -### 4. Error Handling & Resilience - -- gRPC retry logic with exponential backoff -- Circuit breaker pattern for service calls -- Graceful degradation (continue if one collector fails) -- Dead letter queue for failed events - -## Proto Definitions (Starting Point) - -```protobuf -syntax = "proto3"; - -package monitoring; - -service MetricsService { - rpc StreamMetrics(MetricsRequest) returns (stream Metric) {} - rpc GetCurrentState(StateRequest) returns (MachineState) {} -} - -message MetricsRequest { - string machine_id = 1; - int32 interval_seconds = 2; -} - -message Metric { - string machine_id = 1; - int64 timestamp = 2; - MetricType type = 3; - double value = 4; - map labels = 5; -} - -enum MetricType { - CPU_PERCENT = 0; - MEMORY_PERCENT = 1; - MEMORY_USED_GB = 2; - DISK_PERCENT = 3; - NETWORK_SENT_MBPS = 4; - NETWORK_RECV_MBPS = 5; -} - -message MachineState { - string machine_id = 1; - int64 last_seen = 2; - repeated Metric current_metrics = 3; - HealthStatus health = 4; -} - -enum HealthStatus { - HEALTHY = 0; - WARNING = 1; - CRITICAL = 2; - UNKNOWN = 3; -} -``` - -## Project Structure - -``` -system-monitor/ -├── docker-compose.yml -├── proto/ -│ └── metrics.proto -├── services/ -│ ├── collector/ -│ │ ├── Dockerfile -│ │ ├── requirements.txt -│ │ ├── main.py -│ │ └── metrics.py -│ ├── aggregator/ -│ │ ├── Dockerfile -│ │ ├── requirements.txt -│ │ ├── main.py -│ │ └── storage.py -│ ├── gateway/ -│ │ ├── Dockerfile -│ │ ├── requirements.txt -│ │ ├── main.py -│ │ └── websocket.py -│ └── alerts/ -│ ├── Dockerfile -│ ├── requirements.txt -│ ├── main.py -│ └── rules.py -├── web/ -│ ├── static/ -│ │ ├── css/ -│ │ └── js/ -│ └── templates/ -│ └── dashboard.html -└── README.md -``` - -## Interview Talking Points - -### Domain Mapping to Payments - -**What you say:** -- "I built this to monitor my dev machines, but the architecture directly maps to payment processing" -- "Each machine streaming metrics is like a payment processor streaming transactions" -- "The aggregator normalizes data from different sources - same as aggregating from Stripe, PayPal, bank APIs" -- "Alert thresholds on resource usage are structurally identical to fraud detection thresholds" -- "The event stream for audit trails maps directly to payment audit logs" - -### Technical Decisions to Highlight - -**gRPC vs REST:** -- "I use gRPC between services for efficiency and strong typing" -- "FastAPI gateway exposes REST/WebSocket for browser clients" -- "This pattern is common - internal gRPC, external REST" - -**Streaming vs Polling:** -- "Server-side streaming reduces network overhead" -- "Bidirectional streaming allows dynamic configuration updates" -- "WebSocket to browser maintains single connection" - -**State Management:** -- "Redis for hot data - current state, needs fast access" -- "TimescaleDB for historical analysis - optimized for time-series" -- "This tiered storage approach scales to payment transaction volumes" - -**Resilience:** -- "Each collector is independent - one failing doesn't affect others" -- "Circuit breaker prevents cascade failures" -- "Event stream decouples alert processing from metric ingestion" - -### What NOT to Say - -- Don't call it a "toy project" or "learning exercise" -- Don't apologize for running locally vs AWS -- Don't over-explain obvious things -- Don't claim it's production-ready when it's not - -### What TO Say - -- "I built this to solve a real problem I have" -- "Locally it uses PostgreSQL/Redis, in production these become Aurora/ElastiCache" -- "I focused on the architectural patterns since those transfer directly" -- "I'd keep developing this - it's genuinely useful" +| Image | Purpose | +|-------|---------| +| `collector` | Lightweight WebSocket collector for production | +| `hub` | Local aggregator for production | +| `edge` | Cloud dashboard for production | +| `aggregator` | gRPC aggregator (dev stack) | +| `gateway` | FastAPI gateway (dev stack) | +| `collector-grpc` | gRPC collector (dev stack) | +| `alerts` | Alert service (dev stack) | ## Development Guidelines -### Code Quality Standards +### Code Quality - Type hints throughout (Python 3.11+ syntax) - Async/await patterns consistently -- Structured logging (JSON format) -- Error handling at all boundaries -- Unit tests for business logic -- Integration tests for service interactions +- Logging (not print statements) +- Error handling at boundaries -### Docker Best Practices -- Multi-stage builds -- Non-root users -- Health checks -- Resource limits -- Volume mounts for development +### Docker +- Multi-stage builds for smaller images +- `--network host` for collectors (accurate network metrics) -### Configuration Management +### Configuration - Environment variables for all config - Sensible defaults -- Config validation on startup - No secrets in code -## AWS Mapping (For Interview Discussion) +## Interview/Portfolio Talking Points -**What you have → What it becomes:** -- PostgreSQL → Aurora PostgreSQL -- Redis → ElastiCache -- Docker Containers → ECS/Fargate or Lambda -- RabbitMQ/Redis Pub/Sub → SQS/SNS -- Docker Compose → CloudFormation/Terraform -- Local networking → VPC, Security Groups +### Architecture Decisions +- "3-tier for production: collector → hub → edge" +- "Hub allows local aggregation and buffering before forwarding to cloud" +- "Edge terminology shows awareness of edge computing patterns" +- "Full gRPC stack for development demonstrates microservices patterns" -**Key point:** "The architecture and patterns are production-ready, the infrastructure is local for development convenience" - -## Common Pitfalls to Avoid - -1. **Over-engineering Phase 1** - Resist adding features, just get streaming working -2. **Ugly UI** - Don't waste time on design, htmx + basic CSS is fine -3. **Perfect metrics** - Mock data is OK early on, real psutil data comes later -4. **Complete coverage** - Better to have 3 services working perfectly than 10 half-done -5. **AWS deployment** - Local is fine, AWS costs money and adds complexity - -## Success Metrics - -**For Yourself:** -- [ ] Actually use the dashboard daily -- [ ] Catches a real issue before you notice -- [ ] Runs stable for 1+ week without intervention - -**For Interview:** -- [ ] Can demo end-to-end in 5 minutes -- [ ] Can explain every service interaction -- [ ] Can map to payment domain fluently -- [ ] Shows understanding of production patterns - -## Next Steps - -1. Set up project structure -2. Define proto messages -3. Build Phase 1 MVP -4. Iterate based on what feels useful -5. Polish for demo when interview approaches - -## Resources - -- gRPC Python docs: https://grpc.io/docs/languages/python/ -- FastAPI WebSockets: https://fastapi.tiangolo.com/advanced/websockets/ -- TimescaleDB: https://docs.timescale.com/ -- htmx: https://htmx.org/ - -## Questions to Ask Yourself During Development - -- "Would I actually use this feature?" -- "How does this map to payments?" -- "Can I explain why I built it this way?" -- "What would break if X service failed?" -- "How would this scale to 1000 machines?" - ---- - -## Final Note - -This project works because it's: -1. **Real** - You'll use it -2. **Focused** - Shows specific patterns they care about -3. **Mappable** - Clear connection to their domain -4. **Yours** - Not a tutorial copy, demonstrates your thinking - -Build it in phases, use it daily, and by interview time you'll have natural stories about trade-offs, failures, and learnings. That authenticity is more valuable than perfect code. - -Good luck! 🚀 +### Trade-offs +- Production vs Dev: simplicity/cost vs full architecture demo +- WebSocket vs gRPC: browser compatibility vs efficiency +- In-memory vs persistent: operational simplicity vs durability diff --git a/ctrl/README.md b/ctrl/README.md new file mode 100644 index 0000000..b999053 --- /dev/null +++ b/ctrl/README.md @@ -0,0 +1,82 @@ +# Deployment Configurations + +This directory contains deployment configurations for sysmonstm. + +## Architecture + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Collector │────▶│ Hub │────▶│ Edge │────▶│ Browser │ +│ (mcrn) │ │ (local) │ │ (AWS) │ │ │ +└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ +┌─────────────┐ │ +│ Collector │────────────┘ +│ (nfrt) │ +└─────────────┘ +``` + +## Directory Structure + +``` +ctrl/ +├── collector/ # Lightweight agent for each monitored machine +├── hub/ # Local aggregator (receives from collectors, forwards to edge) +├── edge/ # Cloud dashboard (public-facing, receives from hub) +└── dev/ # Full gRPC stack for development +``` + +## Production Deployment (3-tier) + +### 1. Edge (AWS) +Public-facing dashboard that receives metrics from hub. + +```bash +cd ctrl/edge +docker compose up -d +``` + +### 2. Hub (Local Server) +Runs on your local network, receives from collectors, forwards to edge. + +```bash +cd ctrl/hub +EDGE_URL=wss://sysmonstm.mcrn.ar/ws EDGE_API_KEY=xxx docker compose up -d +``` + +### 3. Collectors (Each Machine) +Run on each machine you want to monitor. + +```bash +docker run -d --name sysmonstm-collector --network host \ + -e HUB_URL=ws://hub-machine:8080/ws \ + -e MACHINE_ID=$(hostname) \ + -e API_KEY=xxx \ + registry.mcrn.ar/sysmonstm/collector:latest +``` + +## Development (Full Stack) + +For local development with the complete gRPC-based architecture: + +```bash +# From repo root +docker compose up +``` + +This runs: aggregator, gateway, collector, alerts, redis, timescaledb + +## Environment Variables + +### Collector +- `HUB_URL` - WebSocket URL of hub (default: ws://localhost:8080/ws) +- `MACHINE_ID` - Identifier for this machine (default: hostname) +- `API_KEY` - Authentication key +- `INTERVAL` - Seconds between collections (default: 5) + +### Hub +- `API_KEY` - Key required from collectors +- `EDGE_URL` - WebSocket URL of edge (optional, for forwarding) +- `EDGE_API_KEY` - Key for authenticating to edge + +### Edge +- `API_KEY` - Key required from hub diff --git a/ctrl/collector/Dockerfile b/ctrl/collector/Dockerfile new file mode 100644 index 0000000..d9556c8 --- /dev/null +++ b/ctrl/collector/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install --no-cache-dir psutil websockets + +COPY collector.py . + +# Default environment variables +ENV HUB_URL=ws://localhost:8080/ws +ENV MACHINE_ID="" +ENV API_KEY="" +ENV INTERVAL=5 +ENV LOG_LEVEL=INFO + +CMD ["python", "collector.py"] diff --git a/ctrl/collector/collector.py b/ctrl/collector/collector.py new file mode 100644 index 0000000..68a10a9 --- /dev/null +++ b/ctrl/collector/collector.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +"""Lightweight WebSocket metrics collector for sysmonstm standalone deployment.""" + +import asyncio +import json +import logging +import os +import socket +import time + +import psutil + +# Configuration from environment +HUB_URL = os.environ.get("HUB_URL", "ws://localhost:8080/ws") +MACHINE_ID = os.environ.get("MACHINE_ID", socket.gethostname()) +API_KEY = os.environ.get("API_KEY", "") +INTERVAL = int(os.environ.get("INTERVAL", "5")) +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") + +# Logging setup +logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("collector") + + +def collect_metrics() -> dict: + """Collect system metrics using psutil.""" + metrics = { + "type": "metrics", + "machine_id": MACHINE_ID, + "hostname": socket.gethostname(), + "timestamp": time.time(), + } + + # CPU + try: + metrics["cpu"] = psutil.cpu_percent(interval=None) + except Exception: + pass + + # Memory + try: + mem = psutil.virtual_memory() + metrics["memory"] = mem.percent + metrics["memory_used_gb"] = round(mem.used / (1024**3), 2) + metrics["memory_total_gb"] = round(mem.total / (1024**3), 2) + except Exception: + pass + + # Disk + try: + disk = psutil.disk_usage("/") + metrics["disk"] = disk.percent + metrics["disk_used_gb"] = round(disk.used / (1024**3), 2) + metrics["disk_total_gb"] = round(disk.total / (1024**3), 2) + except Exception: + pass + + # Load average (Unix only) + try: + load1, load5, load15 = psutil.getloadavg() + metrics["load_1m"] = round(load1, 2) + metrics["load_5m"] = round(load5, 2) + metrics["load_15m"] = round(load15, 2) + except (AttributeError, OSError): + pass + + # Network connections count + try: + metrics["connections"] = len(psutil.net_connections(kind="inet")) + except (psutil.AccessDenied, PermissionError): + pass + + # Process count + try: + metrics["processes"] = len(psutil.pids()) + except Exception: + pass + + return metrics + + +async def run_collector(): + """Main collector loop with auto-reconnect.""" + import websockets + + # Build URL with API key if provided + url = HUB_URL + if API_KEY: + separator = "&" if "?" in url else "?" + url = f"{url}{separator}key={API_KEY}" + + # Prime CPU percent (first call always returns 0) + psutil.cpu_percent(interval=None) + + while True: + try: + log.info(f"Connecting to {HUB_URL}...") + async with websockets.connect(url) as ws: + log.info( + f"Connected. Sending metrics every {INTERVAL}s as '{MACHINE_ID}'" + ) + + while True: + metrics = collect_metrics() + await ws.send(json.dumps(metrics)) + log.debug( + f"Sent: cpu={metrics.get('cpu', '?')}% mem={metrics.get('memory', '?')}% disk={metrics.get('disk', '?')}%" + ) + await asyncio.sleep(INTERVAL) + + except asyncio.CancelledError: + log.info("Collector stopped") + break + except Exception as e: + log.warning(f"Connection error: {e}. Reconnecting in 5s...") + await asyncio.sleep(5) + + +def main(): + log.info("sysmonstm collector starting") + log.info(f" Hub: {HUB_URL}") + log.info(f" Machine: {MACHINE_ID}") + log.info(f" Interval: {INTERVAL}s") + + try: + asyncio.run(run_collector()) + except KeyboardInterrupt: + log.info("Stopped") + + +if __name__ == "__main__": + main() diff --git a/docker-compose.override.yml b/ctrl/dev/docker-compose.override.yml similarity index 100% rename from docker-compose.override.yml rename to ctrl/dev/docker-compose.override.yml diff --git a/ctrl/dev/docker-compose.yml b/ctrl/dev/docker-compose.yml new file mode 100644 index 0000000..efc32da --- /dev/null +++ b/ctrl/dev/docker-compose.yml @@ -0,0 +1,154 @@ +version: "3.8" + +# This file works both locally and on EC2 for demo purposes. +# For local dev with hot-reload, use: docker compose -f docker-compose.yml -f docker-compose.override.yml up + +x-common-env: &common-env + REDIS_URL: redis://redis:6379 + TIMESCALE_URL: postgresql://monitor:monitor@timescaledb:5432/monitor + EVENTS_BACKEND: redis_pubsub + LOG_LEVEL: ${LOG_LEVEL:-INFO} + LOG_FORMAT: json + +x-healthcheck-defaults: &healthcheck-defaults + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s + +services: + # ============================================================================= + # Infrastructure + # ============================================================================= + + redis: + image: redis:7-alpine + ports: + - "${REDIS_PORT:-6379}:6379" + volumes: + - redis-data:/data + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "redis-cli", "ping"] + deploy: + resources: + limits: + memory: 128M + + timescaledb: + image: timescale/timescaledb:latest-pg15 + environment: + POSTGRES_USER: monitor + POSTGRES_PASSWORD: monitor + POSTGRES_DB: monitor + ports: + - "${TIMESCALE_PORT:-5432}:5432" + volumes: + - timescale-data:/var/lib/postgresql/data + - ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro + healthcheck: + <<: *healthcheck-defaults + test: ["CMD-SHELL", "pg_isready -U monitor -d monitor"] + deploy: + resources: + limits: + memory: 512M + + # ============================================================================= + # Application Services + # ============================================================================= + + aggregator: + build: + context: . + dockerfile: services/aggregator/Dockerfile + environment: + <<: *common-env + GRPC_PORT: 50051 + SERVICE_NAME: aggregator + ports: + - "${AGGREGATOR_GRPC_PORT:-50051}:50051" + depends_on: + redis: + condition: service_healthy + timescaledb: + condition: service_healthy + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "/bin/grpc_health_probe", "-addr=:50051"] + deploy: + resources: + limits: + memory: 256M + + gateway: + build: + context: . + dockerfile: services/gateway/Dockerfile + environment: + <<: *common-env + HTTP_PORT: 8000 + AGGREGATOR_URL: aggregator:50051 + SERVICE_NAME: gateway + ports: + - "${GATEWAY_PORT:-8000}:8000" + depends_on: + - aggregator + - redis + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + deploy: + resources: + limits: + memory: 256M + + alerts: + build: + context: . + dockerfile: services/alerts/Dockerfile + environment: + <<: *common-env + SERVICE_NAME: alerts + depends_on: + redis: + condition: service_healthy + timescaledb: + condition: service_healthy + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "python", "-c", "import sys; sys.exit(0)"] + deploy: + resources: + limits: + memory: 128M + + # Collector runs separately on each machine being monitored + # For local testing, we run one instance + collector: + build: + context: . + dockerfile: services/collector/Dockerfile + environment: + <<: *common-env + AGGREGATOR_URL: aggregator:50051 + MACHINE_ID: ${MACHINE_ID:-local-dev} + COLLECTION_INTERVAL: ${COLLECTION_INTERVAL:-5} + SERVICE_NAME: collector + depends_on: + - aggregator + deploy: + resources: + limits: + memory: 64M + # For actual system metrics, you might need: + # privileged: true + # pid: host + +volumes: + redis-data: + timescale-data: + +networks: + default: + name: sysmonstm diff --git a/ctrl/edge/Dockerfile b/ctrl/edge/Dockerfile new file mode 100644 index 0000000..9e92404 --- /dev/null +++ b/ctrl/edge/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets + +COPY edge.py . + +ENV API_KEY="" +ENV LOG_LEVEL=INFO + +EXPOSE 8080 + +CMD ["uvicorn", "edge:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/ctrl/standalone/README.md b/ctrl/edge/README.md similarity index 100% rename from ctrl/standalone/README.md rename to ctrl/edge/README.md diff --git a/ctrl/standalone/docker-compose.yml b/ctrl/edge/docker-compose.yml similarity index 54% rename from ctrl/standalone/docker-compose.yml rename to ctrl/edge/docker-compose.yml index ba99eb0..db359b3 100644 --- a/ctrl/standalone/docker-compose.yml +++ b/ctrl/edge/docker-compose.yml @@ -1,8 +1,11 @@ services: - sysmonstm: + edge: build: . - container_name: sysmonstm + container_name: sysmonstm-edge restart: unless-stopped + environment: + - API_KEY=${API_KEY:-} + - LOG_LEVEL=${LOG_LEVEL:-INFO} ports: - "8080:8080" networks: diff --git a/ctrl/standalone/main.py b/ctrl/edge/edge.py similarity index 66% rename from ctrl/standalone/main.py rename to ctrl/edge/edge.py index d87326d..581fe00 100644 --- a/ctrl/standalone/main.py +++ b/ctrl/edge/edge.py @@ -1,11 +1,26 @@ """Minimal sysmonstm gateway - standalone mode without dependencies.""" -from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from fastapi.responses import HTMLResponse -import json import asyncio +import json +import logging +import os from datetime import datetime +from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse + +# Configuration +API_KEY = os.environ.get("API_KEY", "") +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") + +# Logging setup +logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("gateway") + app = FastAPI(title="sysmonstm") # Store connected websockets @@ -107,19 +122,20 @@ HTML = """ let machines = {}; function connect() { - const ws = new WebSocket(`wss://${location.host}/ws`); - + const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const ws = new WebSocket(`${protocol}//${location.host}/ws`); + ws.onopen = () => { statusDot.classList.add('ok'); statusText.textContent = 'connected'; }; - + ws.onclose = () => { statusDot.classList.remove('ok'); statusText.textContent = 'disconnected'; setTimeout(connect, 2000); }; - + ws.onmessage = (e) => { const data = JSON.parse(e.data); if (data.type === 'metrics') { @@ -128,71 +144,113 @@ HTML = """ } }; } - + function render() { - const ids = Object.keys(machines); + const ids = Object.keys(machines).sort(); if (ids.length === 0) { machinesEl.innerHTML = '

No collectors connected

Start a collector to see metrics

'; return; } - + machinesEl.innerHTML = ids.map(id => { const m = machines[id]; + const ts = m.timestamp ? new Date(m.timestamp * 1000).toLocaleTimeString() : '-'; return `

${id}

CPU${m.cpu?.toFixed(1) || '-'}%
Memory${m.memory?.toFixed(1) || '-'}%
Disk${m.disk?.toFixed(1) || '-'}%
-
Updated${new Date(m.timestamp).toLocaleTimeString()}
+
Load (1m)${m.load_1m?.toFixed(2) || '-'}
+
Processes${m.processes || '-'}
+
Updated${ts}
`; }).join(''); } - + connect(); """ + @app.get("/", response_class=HTMLResponse) async def index(): return HTML + @app.get("/health") async def health(): return {"status": "ok", "machines": len(machines)} + +@app.get("/api/machines") +async def get_machines(): + return machines + + @app.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): +async def websocket_endpoint(websocket: WebSocket, key: str = Query(default="")): + # API key validation for collectors (browsers don't need key) + # Check if this looks like a collector (will send metrics) or browser (will receive) + # We validate key only when metrics are received, allowing browsers to connect freely + await websocket.accept() connections.append(websocket) + client = websocket.client.host if websocket.client else "unknown" + log.info(f"WebSocket connected: {client}") + try: - # Send current state + # Send current state to new connection for machine_id, data in machines.items(): - await websocket.send_json({"type": "metrics", "machine_id": machine_id, **data}) - # Keep alive + await websocket.send_json( + {"type": "metrics", "machine_id": machine_id, **data} + ) + + # Main loop while True: try: msg = await asyncio.wait_for(websocket.receive_text(), timeout=30) data = json.loads(msg) + if data.get("type") == "metrics": + # Validate API key for metric submissions + if API_KEY and key != API_KEY: + log.warning(f"Invalid API key from {client}") + await websocket.close(code=4001, reason="Invalid API key") + return + machine_id = data.get("machine_id", "unknown") - machines[machine_id] = {**data, "timestamp": datetime.utcnow().isoformat()} - # Broadcast to all + machines[machine_id] = data + log.debug(f"Metrics from {machine_id}: cpu={data.get('cpu')}%") + + # Broadcast to all connected clients for conn in connections: try: - await conn.send_json({"type": "metrics", "machine_id": machine_id, **machines[machine_id]}) - except: + await conn.send_json( + {"type": "metrics", "machine_id": machine_id, **data} + ) + except Exception: pass + except asyncio.TimeoutError: + # Send ping to keep connection alive await websocket.send_json({"type": "ping"}) + except WebSocketDisconnect: - pass + log.info(f"WebSocket disconnected: {client}") + except Exception as e: + log.error(f"WebSocket error: {e}") finally: - connections.remove(websocket) + if websocket in connections: + connections.remove(websocket) + if __name__ == "__main__": import uvicorn + + log.info("Starting sysmonstm gateway") + log.info(f" API key: {'configured' if API_KEY else 'not set (open)'}") uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/ctrl/hub/Dockerfile b/ctrl/hub/Dockerfile new file mode 100644 index 0000000..2488bb5 --- /dev/null +++ b/ctrl/hub/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets + +COPY hub.py . + +ENV API_KEY="" +ENV EDGE_URL="" +ENV EDGE_API_KEY="" +ENV LOG_LEVEL=INFO + +EXPOSE 8080 + +CMD ["uvicorn", "hub:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/ctrl/hub/docker-compose.yml b/ctrl/hub/docker-compose.yml new file mode 100644 index 0000000..3ab3aad --- /dev/null +++ b/ctrl/hub/docker-compose.yml @@ -0,0 +1,12 @@ +services: + hub: + build: . + container_name: sysmonstm-hub + restart: unless-stopped + environment: + - API_KEY=${API_KEY:-} + - EDGE_URL=${EDGE_URL:-} + - EDGE_API_KEY=${EDGE_API_KEY:-} + - LOG_LEVEL=${LOG_LEVEL:-INFO} + ports: + - "8080:8080" diff --git a/ctrl/hub/hub.py b/ctrl/hub/hub.py new file mode 100644 index 0000000..39e7afd --- /dev/null +++ b/ctrl/hub/hub.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +sysmonstm hub - Local aggregator that receives from collectors and forwards to edge. + +Runs on the local network, receives metrics from collectors via WebSocket, +and forwards them to the cloud edge. +""" + +import asyncio +import json +import logging +import os + +from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect + +# Configuration +API_KEY = os.environ.get("API_KEY", "") +EDGE_URL = os.environ.get("EDGE_URL", "") # e.g., wss://sysmonstm.mcrn.ar/ws +EDGE_API_KEY = os.environ.get("EDGE_API_KEY", "") +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") + +# Logging setup +logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("hub") + +app = FastAPI(title="sysmonstm-hub") + +# State +collector_connections: list[WebSocket] = [] +machines: dict = {} +edge_ws = None + + +async def connect_to_edge(): + """Maintain persistent connection to edge and forward metrics.""" + global edge_ws + + if not EDGE_URL: + log.info("No EDGE_URL configured, running in local-only mode") + return + + import websockets + + url = EDGE_URL + if EDGE_API_KEY: + separator = "&" if "?" in url else "?" + url = f"{url}{separator}key={EDGE_API_KEY}" + + while True: + try: + log.info(f"Connecting to edge: {EDGE_URL}") + async with websockets.connect(url) as ws: + edge_ws = ws + log.info("Connected to edge") + + while True: + try: + msg = await asyncio.wait_for(ws.recv(), timeout=30) + # Ignore messages from edge (pings, etc) + except asyncio.TimeoutError: + await ws.ping() + + except asyncio.CancelledError: + break + except Exception as e: + edge_ws = None + log.warning(f"Edge connection error: {e}. Reconnecting in 5s...") + await asyncio.sleep(5) + + +async def forward_to_edge(data: dict): + """Forward metrics to edge if connected.""" + global edge_ws + if edge_ws: + try: + await edge_ws.send(json.dumps(data)) + log.debug(f"Forwarded to edge: {data.get('machine_id')}") + except Exception as e: + log.warning(f"Failed to forward to edge: {e}") + + +@app.on_event("startup") +async def startup(): + asyncio.create_task(connect_to_edge()) + + +@app.get("/health") +async def health(): + return { + "status": "ok", + "machines": len(machines), + "collectors": len(collector_connections), + "edge_connected": edge_ws is not None, + } + + +@app.get("/api/machines") +async def get_machines(): + return machines + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket, key: str = Query(default="")): + # Validate API key + if API_KEY and key != API_KEY: + log.warning(f"Invalid API key from {websocket.client}") + await websocket.close(code=4001, reason="Invalid API key") + return + + await websocket.accept() + collector_connections.append(websocket) + client = websocket.client.host if websocket.client else "unknown" + log.info(f"Collector connected: {client}") + + try: + while True: + try: + msg = await asyncio.wait_for(websocket.receive_text(), timeout=30) + data = json.loads(msg) + + if data.get("type") == "metrics": + machine_id = data.get("machine_id", "unknown") + machines[machine_id] = data + log.debug(f"Metrics from {machine_id}: cpu={data.get('cpu')}%") + + # Forward to edge + await forward_to_edge(data) + + except asyncio.TimeoutError: + await websocket.send_json({"type": "ping"}) + + except WebSocketDisconnect: + log.info(f"Collector disconnected: {client}") + except Exception as e: + log.error(f"WebSocket error: {e}") + finally: + if websocket in collector_connections: + collector_connections.remove(websocket) + + +if __name__ == "__main__": + import uvicorn + + log.info("Starting sysmonstm hub") + log.info(f" API key: {'configured' if API_KEY else 'not set (open)'}") + log.info(f" Edge URL: {EDGE_URL or 'not configured (local only)'}") + uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/ctrl/standalone/Dockerfile b/ctrl/standalone/Dockerfile deleted file mode 100644 index d3cc8d6..0000000 --- a/ctrl/standalone/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM python:3.11-slim -WORKDIR /app -RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets -COPY main.py . -EXPOSE 8080 -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index efc32da..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,154 +0,0 @@ -version: "3.8" - -# This file works both locally and on EC2 for demo purposes. -# For local dev with hot-reload, use: docker compose -f docker-compose.yml -f docker-compose.override.yml up - -x-common-env: &common-env - REDIS_URL: redis://redis:6379 - TIMESCALE_URL: postgresql://monitor:monitor@timescaledb:5432/monitor - EVENTS_BACKEND: redis_pubsub - LOG_LEVEL: ${LOG_LEVEL:-INFO} - LOG_FORMAT: json - -x-healthcheck-defaults: &healthcheck-defaults - interval: 10s - timeout: 5s - retries: 3 - start_period: 10s - -services: - # ============================================================================= - # Infrastructure - # ============================================================================= - - redis: - image: redis:7-alpine - ports: - - "${REDIS_PORT:-6379}:6379" - volumes: - - redis-data:/data - healthcheck: - <<: *healthcheck-defaults - test: ["CMD", "redis-cli", "ping"] - deploy: - resources: - limits: - memory: 128M - - timescaledb: - image: timescale/timescaledb:latest-pg15 - environment: - POSTGRES_USER: monitor - POSTGRES_PASSWORD: monitor - POSTGRES_DB: monitor - ports: - - "${TIMESCALE_PORT:-5432}:5432" - volumes: - - timescale-data:/var/lib/postgresql/data - - ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro - healthcheck: - <<: *healthcheck-defaults - test: ["CMD-SHELL", "pg_isready -U monitor -d monitor"] - deploy: - resources: - limits: - memory: 512M - - # ============================================================================= - # Application Services - # ============================================================================= - - aggregator: - build: - context: . - dockerfile: services/aggregator/Dockerfile - environment: - <<: *common-env - GRPC_PORT: 50051 - SERVICE_NAME: aggregator - ports: - - "${AGGREGATOR_GRPC_PORT:-50051}:50051" - depends_on: - redis: - condition: service_healthy - timescaledb: - condition: service_healthy - healthcheck: - <<: *healthcheck-defaults - test: ["CMD", "/bin/grpc_health_probe", "-addr=:50051"] - deploy: - resources: - limits: - memory: 256M - - gateway: - build: - context: . - dockerfile: services/gateway/Dockerfile - environment: - <<: *common-env - HTTP_PORT: 8000 - AGGREGATOR_URL: aggregator:50051 - SERVICE_NAME: gateway - ports: - - "${GATEWAY_PORT:-8000}:8000" - depends_on: - - aggregator - - redis - healthcheck: - <<: *healthcheck-defaults - test: ["CMD", "curl", "-f", "http://localhost:8000/health"] - deploy: - resources: - limits: - memory: 256M - - alerts: - build: - context: . - dockerfile: services/alerts/Dockerfile - environment: - <<: *common-env - SERVICE_NAME: alerts - depends_on: - redis: - condition: service_healthy - timescaledb: - condition: service_healthy - healthcheck: - <<: *healthcheck-defaults - test: ["CMD", "python", "-c", "import sys; sys.exit(0)"] - deploy: - resources: - limits: - memory: 128M - - # Collector runs separately on each machine being monitored - # For local testing, we run one instance - collector: - build: - context: . - dockerfile: services/collector/Dockerfile - environment: - <<: *common-env - AGGREGATOR_URL: aggregator:50051 - MACHINE_ID: ${MACHINE_ID:-local-dev} - COLLECTION_INTERVAL: ${COLLECTION_INTERVAL:-5} - SERVICE_NAME: collector - depends_on: - - aggregator - deploy: - resources: - limits: - memory: 64M - # For actual system metrics, you might need: - # privileged: true - # pid: host - -volumes: - redis-data: - timescale-data: - -networks: - default: - name: sysmonstm diff --git a/docker-compose.yml b/docker-compose.yml new file mode 120000 index 0000000..9d9447d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1 @@ +ctrl/dev/docker-compose.yml \ No newline at end of file