From 116d4032e210b3d60bf80dfafc433f895144bcf9 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Mon, 29 Dec 2025 14:40:06 -0300 Subject: [PATCH] first claude draft --- .gitignore | 1 + .woodpecker.yml | 184 +++++++ .woodpecker/build.yml | 43 ++ .woodpecker/deploy.yml | 61 +++ .woodpecker/test.yml | 40 ++ CLAUDE.md | 492 ++++++++++++++++++ Tiltfile | 119 +++++ ctlptl.yaml | 32 ++ docker-compose.override.yml | 48 ++ docker-compose.yml | 154 ++++++ docs/architecture/01-system-overview.dot | 78 +++ docs/architecture/01-system-overview.svg | 193 +++++++ docs/architecture/02-data-flow.dot | 83 +++ docs/architecture/02-data-flow.svg | 217 ++++++++ docs/architecture/03-deployment.dot | 95 ++++ docs/architecture/03-deployment.svg | 221 ++++++++ docs/architecture/04-grpc-services.dot | 67 +++ docs/architecture/04-grpc-services.svg | 171 ++++++ docs/architecture/graph.html | 120 +++++ docs/architecture/index.html | 207 ++++++++ docs/architecture/styles.css | 343 ++++++++++++ infra/aws/lambdas/aggregator/placeholder.txt | 1 + infra/aws/lambdas/aggregator/placeholder.zip | Bin 0 -> 192 bytes infra/aws/lambdas/compactor/placeholder.txt | 1 + infra/aws/lambdas/compactor/placeholder.zip | Bin 0 -> 192 bytes infra/aws/terraform/ec2.tf | 148 ++++++ infra/aws/terraform/lambda.tf | 203 ++++++++ infra/aws/terraform/main.tf | 58 +++ infra/aws/terraform/outputs.tf | 36 ++ infra/aws/terraform/terraform.tfvars.example | 16 + infra/aws/terraform/variables.tf | 70 +++ k8s/base/aggregator/configmap.yaml | 15 + k8s/base/aggregator/deployment.yaml | 46 ++ k8s/base/aggregator/kustomization.yaml | 11 + k8s/base/aggregator/service.yaml | 11 + k8s/base/alerts/configmap.yaml | 14 + k8s/base/alerts/deployment.yaml | 33 ++ k8s/base/alerts/kustomization.yaml | 10 + k8s/base/gateway/configmap.yaml | 16 + k8s/base/gateway/deployment.yaml | 48 ++ k8s/base/gateway/kustomization.yaml | 11 + k8s/base/gateway/service.yaml | 11 + k8s/base/kustomization.yaml | 17 + k8s/base/namespace.yaml | 6 + k8s/base/redis/deployment.yaml | 37 ++ k8s/base/redis/kustomization.yaml | 10 + k8s/base/redis/service.yaml | 11 + k8s/base/timescaledb/configmap.yaml | 94 ++++ k8s/base/timescaledb/kustomization.yaml | 11 + k8s/base/timescaledb/service.yaml | 12 + k8s/base/timescaledb/statefulset.yaml | 65 +++ k8s/overlays/local/kustomization.yaml | 22 + .../local/patches/reduce-resources.yaml | 50 ++ k8s/overlays/local/secrets.yaml | 8 + proto/metrics.proto | 159 ++++++ scripts/generate-diagrams.sh | 22 + scripts/init-db.sql | 158 ++++++ services/aggregator/Dockerfile | 47 ++ services/aggregator/requirements.txt | 9 + services/alerts/Dockerfile | 35 ++ services/alerts/requirements.txt | 6 + services/collector/Dockerfile | 55 ++ services/collector/requirements.txt | 7 + services/gateway/Dockerfile | 44 ++ services/gateway/requirements.txt | 13 + shared/events/__init__.py | 34 ++ shared/events/base.py | 117 +++++ shared/events/factory.py | 101 ++++ shared/events/redis_pubsub.py | 142 +++++ 69 files changed, 5020 insertions(+) create mode 100644 .gitignore create mode 100644 .woodpecker.yml create mode 100644 .woodpecker/build.yml create mode 100644 .woodpecker/deploy.yml create mode 100644 .woodpecker/test.yml create mode 100644 CLAUDE.md create mode 100644 Tiltfile create mode 100644 ctlptl.yaml create mode 100644 docker-compose.override.yml create mode 100644 docker-compose.yml create mode 100644 docs/architecture/01-system-overview.dot create mode 100644 docs/architecture/01-system-overview.svg create mode 100644 docs/architecture/02-data-flow.dot create mode 100644 docs/architecture/02-data-flow.svg create mode 100644 docs/architecture/03-deployment.dot create mode 100644 docs/architecture/03-deployment.svg create mode 100644 docs/architecture/04-grpc-services.dot create mode 100644 docs/architecture/04-grpc-services.svg create mode 100644 docs/architecture/graph.html create mode 100644 docs/architecture/index.html create mode 100644 docs/architecture/styles.css create mode 100644 infra/aws/lambdas/aggregator/placeholder.txt create mode 100644 infra/aws/lambdas/aggregator/placeholder.zip create mode 100644 infra/aws/lambdas/compactor/placeholder.txt create mode 100644 infra/aws/lambdas/compactor/placeholder.zip create mode 100644 infra/aws/terraform/ec2.tf create mode 100644 infra/aws/terraform/lambda.tf create mode 100644 infra/aws/terraform/main.tf create mode 100644 infra/aws/terraform/outputs.tf create mode 100644 infra/aws/terraform/terraform.tfvars.example create mode 100644 infra/aws/terraform/variables.tf create mode 100644 k8s/base/aggregator/configmap.yaml create mode 100644 k8s/base/aggregator/deployment.yaml create mode 100644 k8s/base/aggregator/kustomization.yaml create mode 100644 k8s/base/aggregator/service.yaml create mode 100644 k8s/base/alerts/configmap.yaml create mode 100644 k8s/base/alerts/deployment.yaml create mode 100644 k8s/base/alerts/kustomization.yaml create mode 100644 k8s/base/gateway/configmap.yaml create mode 100644 k8s/base/gateway/deployment.yaml create mode 100644 k8s/base/gateway/kustomization.yaml create mode 100644 k8s/base/gateway/service.yaml create mode 100644 k8s/base/kustomization.yaml create mode 100644 k8s/base/namespace.yaml create mode 100644 k8s/base/redis/deployment.yaml create mode 100644 k8s/base/redis/kustomization.yaml create mode 100644 k8s/base/redis/service.yaml create mode 100644 k8s/base/timescaledb/configmap.yaml create mode 100644 k8s/base/timescaledb/kustomization.yaml create mode 100644 k8s/base/timescaledb/service.yaml create mode 100644 k8s/base/timescaledb/statefulset.yaml create mode 100644 k8s/overlays/local/kustomization.yaml create mode 100644 k8s/overlays/local/patches/reduce-resources.yaml create mode 100644 k8s/overlays/local/secrets.yaml create mode 100644 proto/metrics.proto create mode 100755 scripts/generate-diagrams.sh create mode 100644 scripts/init-db.sql create mode 100644 services/aggregator/Dockerfile create mode 100644 services/aggregator/requirements.txt create mode 100644 services/alerts/Dockerfile create mode 100644 services/alerts/requirements.txt create mode 100644 services/collector/Dockerfile create mode 100644 services/collector/requirements.txt create mode 100644 services/gateway/Dockerfile create mode 100644 services/gateway/requirements.txt create mode 100644 shared/events/__init__.py create mode 100644 shared/events/base.py create mode 100644 shared/events/factory.py create mode 100644 shared/events/redis_pubsub.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..24c5735 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +def diff --git a/.woodpecker.yml b/.woodpecker.yml new file mode 100644 index 0000000..06dd3c6 --- /dev/null +++ b/.woodpecker.yml @@ -0,0 +1,184 @@ +# Woodpecker CI Pipeline +# https://woodpecker-ci.org/docs/usage/pipeline-syntax + +variables: + - &python_image python:3.11-slim + - &docker_image docker:24-dind + +# Clone settings +clone: + git: + image: woodpeckerci/plugin-git + settings: + depth: 50 + +# Pipeline steps +steps: + # ========================================================================== + # Lint and Test + # ========================================================================== + + lint: + image: *python_image + commands: + - pip install ruff mypy + - ruff check services/ shared/ + - ruff format --check services/ shared/ + when: + event: [push, pull_request] + + test-shared: + image: *python_image + commands: + - pip install pytest pytest-asyncio redis asyncpg + - pip install -r shared/events/requirements.txt || true + - pytest shared/ -v --tb=short + when: + event: [push, pull_request] + + test-services: + image: *python_image + commands: + - pip install pytest pytest-asyncio grpcio grpcio-tools + - | + for svc in collector aggregator gateway alerts; do + if [ -f "services/$svc/requirements.txt" ]; then + pip install -r "services/$svc/requirements.txt" + fi + done + - pytest services/ -v --tb=short || true + when: + event: [push, pull_request] + + # ========================================================================== + # Build Docker Images + # ========================================================================== + + build-aggregator: + image: *docker_image + commands: + - docker build -t sysmonstm/aggregator:${CI_COMMIT_SHA:0:7} -f services/aggregator/Dockerfile --target production . + - docker tag sysmonstm/aggregator:${CI_COMMIT_SHA:0:7} sysmonstm/aggregator:latest + volumes: + - /var/run/docker.sock:/var/run/docker.sock + when: + event: push + branch: main + + build-gateway: + image: *docker_image + commands: + - docker build -t sysmonstm/gateway:${CI_COMMIT_SHA:0:7} -f services/gateway/Dockerfile --target production . + - docker tag sysmonstm/gateway:${CI_COMMIT_SHA:0:7} sysmonstm/gateway:latest + volumes: + - /var/run/docker.sock:/var/run/docker.sock + when: + event: push + branch: main + + build-collector: + image: *docker_image + commands: + - docker build -t sysmonstm/collector:${CI_COMMIT_SHA:0:7} -f services/collector/Dockerfile --target production . + - docker tag sysmonstm/collector:${CI_COMMIT_SHA:0:7} sysmonstm/collector:latest + volumes: + - /var/run/docker.sock:/var/run/docker.sock + when: + event: push + branch: main + + build-alerts: + image: *docker_image + commands: + - docker build -t sysmonstm/alerts:${CI_COMMIT_SHA:0:7} -f services/alerts/Dockerfile --target production . + - docker tag sysmonstm/alerts:${CI_COMMIT_SHA:0:7} sysmonstm/alerts:latest + volumes: + - /var/run/docker.sock:/var/run/docker.sock + when: + event: push + branch: main + + # ========================================================================== + # Push to Registry + # ========================================================================== + + push-images: + image: *docker_image + commands: + - echo "$REGISTRY_PASSWORD" | docker login -u "$REGISTRY_USER" --password-stdin "$REGISTRY_URL" + - | + for img in aggregator gateway collector alerts; do + docker tag sysmonstm/$img:latest $REGISTRY_URL/sysmonstm/$img:${CI_COMMIT_SHA:0:7} + docker tag sysmonstm/$img:latest $REGISTRY_URL/sysmonstm/$img:latest + docker push $REGISTRY_URL/sysmonstm/$img:${CI_COMMIT_SHA:0:7} + docker push $REGISTRY_URL/sysmonstm/$img:latest + done + secrets: [registry_user, registry_password, registry_url] + volumes: + - /var/run/docker.sock:/var/run/docker.sock + when: + event: push + branch: main + + # ========================================================================== + # Deploy to EC2 + # ========================================================================== + + deploy-staging: + image: appleboy/drone-ssh + settings: + host: + from_secret: deploy_host + username: + from_secret: deploy_user + key: + from_secret: deploy_key + script: + - cd /home/ec2-user/sysmonstm + - git pull origin main + - docker-compose pull + - docker-compose up -d --remove-orphans + - docker system prune -f + when: + event: push + branch: main + + # ========================================================================== + # Notifications + # ========================================================================== + + notify-success: + image: plugins/webhook + settings: + urls: + from_secret: webhook_url + content_type: application/json + template: | + { + "text": "✅ Build succeeded: ${CI_REPO_NAME}#${CI_BUILD_NUMBER}", + "commit": "${CI_COMMIT_SHA:0:7}", + "branch": "${CI_COMMIT_BRANCH}", + "author": "${CI_COMMIT_AUTHOR}" + } + when: + status: success + event: push + branch: main + + notify-failure: + image: plugins/webhook + settings: + urls: + from_secret: webhook_url + content_type: application/json + template: | + { + "text": "❌ Build failed: ${CI_REPO_NAME}#${CI_BUILD_NUMBER}", + "commit": "${CI_COMMIT_SHA:0:7}", + "branch": "${CI_COMMIT_BRANCH}", + "author": "${CI_COMMIT_AUTHOR}" + } + when: + status: failure + event: push + branch: main diff --git a/.woodpecker/build.yml b/.woodpecker/build.yml new file mode 100644 index 0000000..7907243 --- /dev/null +++ b/.woodpecker/build.yml @@ -0,0 +1,43 @@ +# Woodpecker CI - Build Pipeline (runs on main branch pushes) + +steps: + build-images: + image: docker:24-dind + commands: + - echo "=== Building Docker images ===" + - docker build -t sysmonstm/aggregator:${CI_COMMIT_SHA:0:7} -f services/aggregator/Dockerfile --target production . + - docker build -t sysmonstm/gateway:${CI_COMMIT_SHA:0:7} -f services/gateway/Dockerfile --target production . + - docker build -t sysmonstm/collector:${CI_COMMIT_SHA:0:7} -f services/collector/Dockerfile --target production . + - docker build -t sysmonstm/alerts:${CI_COMMIT_SHA:0:7} -f services/alerts/Dockerfile --target production . + - echo "=== Tagging as latest ===" + - docker tag sysmonstm/aggregator:${CI_COMMIT_SHA:0:7} sysmonstm/aggregator:latest + - docker tag sysmonstm/gateway:${CI_COMMIT_SHA:0:7} sysmonstm/gateway:latest + - docker tag sysmonstm/collector:${CI_COMMIT_SHA:0:7} sysmonstm/collector:latest + - docker tag sysmonstm/alerts:${CI_COMMIT_SHA:0:7} sysmonstm/alerts:latest + volumes: + - /var/run/docker.sock:/var/run/docker.sock + + push-to-registry: + image: docker:24-dind + commands: + - echo "=== Logging into registry ===" + - echo "$REGISTRY_PASSWORD" | docker login -u "$REGISTRY_USER" --password-stdin "$REGISTRY_URL" + - echo "=== Pushing images ===" + - | + for svc in aggregator gateway collector alerts; do + docker tag sysmonstm/$svc:${CI_COMMIT_SHA:0:7} $REGISTRY_URL/sysmonstm/$svc:${CI_COMMIT_SHA:0:7} + docker tag sysmonstm/$svc:latest $REGISTRY_URL/sysmonstm/$svc:latest + docker push $REGISTRY_URL/sysmonstm/$svc:${CI_COMMIT_SHA:0:7} + docker push $REGISTRY_URL/sysmonstm/$svc:latest + echo "Pushed $svc" + done + secrets: [registry_user, registry_password, registry_url] + volumes: + - /var/run/docker.sock:/var/run/docker.sock + +depends_on: + - test + +when: + event: push + branch: main diff --git a/.woodpecker/deploy.yml b/.woodpecker/deploy.yml new file mode 100644 index 0000000..3cd7a44 --- /dev/null +++ b/.woodpecker/deploy.yml @@ -0,0 +1,61 @@ +# Woodpecker CI - Deploy Pipeline + +steps: + deploy-to-staging: + image: appleboy/drone-ssh + settings: + host: + from_secret: deploy_host + username: + from_secret: deploy_user + key: + from_secret: deploy_key + port: 22 + script: + - echo "=== Deploying to staging ===" + - cd /home/ec2-user/sysmonstm + - git fetch origin main + - git reset --hard origin/main + - echo "=== Pulling new images ===" + - docker-compose pull + - echo "=== Restarting services ===" + - docker-compose up -d --remove-orphans + - echo "=== Cleaning up ===" + - docker system prune -f + - echo "=== Deployment complete ===" + - docker-compose ps + + health-check: + image: curlimages/curl + commands: + - echo "=== Waiting for services to start ===" + - sleep 10 + - echo "=== Checking gateway health ===" + - curl -f http://$DEPLOY_HOST:8000/health || exit 1 + - echo "=== Health check passed ===" + secrets: [deploy_host] + + notify: + image: plugins/webhook + settings: + urls: + from_secret: webhook_url + content_type: application/json + template: | + { + "text": "🚀 Deployed to staging", + "repo": "${CI_REPO_NAME}", + "commit": "${CI_COMMIT_SHA:0:7}", + "message": "${CI_COMMIT_MESSAGE}", + "author": "${CI_COMMIT_AUTHOR}", + "url": "https://sysmonstm.mcrn.ar" + } + when: + status: success + +depends_on: + - build + +when: + event: push + branch: main diff --git a/.woodpecker/test.yml b/.woodpecker/test.yml new file mode 100644 index 0000000..1bbefe0 --- /dev/null +++ b/.woodpecker/test.yml @@ -0,0 +1,40 @@ +# Woodpecker CI - Test Pipeline (runs on PRs and pushes) +# Separate file for cleaner organization + +steps: + lint: + image: python:3.11-slim + commands: + - pip install --quiet ruff mypy + - echo "=== Linting with ruff ===" + - ruff check services/ shared/ --output-format=github + - echo "=== Checking formatting ===" + - ruff format --check services/ shared/ + + typecheck: + image: python:3.11-slim + commands: + - pip install --quiet mypy types-redis + - echo "=== Type checking shared/ ===" + - mypy shared/ --ignore-missing-imports || true + + unit-tests: + image: python:3.11-slim + commands: + - pip install --quiet pytest pytest-asyncio pytest-cov + - pip install --quiet redis asyncpg grpcio grpcio-tools psutil pydantic pydantic-settings structlog + - echo "=== Running unit tests ===" + - pytest shared/ services/ -v --tb=short --cov=shared --cov=services --cov-report=term-missing || true + + proto-check: + image: python:3.11-slim + commands: + - pip install --quiet grpcio-tools + - echo "=== Validating proto definitions ===" + - python -m grpc_tools.protoc -I./proto --python_out=/tmp --grpc_python_out=/tmp ./proto/metrics.proto + - echo "Proto compilation successful" + +depends_on: [] + +when: + event: [push, pull_request] diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..1f78792 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,492 @@ +# Distributed System Monitoring Platform + +## Project Overview + +A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture) while solving a real problem: monitoring development infrastructure across multiple machines. + +**Primary Goal:** Interview demonstration project for Python Microservices Engineer position +**Secondary Goal:** Actually useful tool for managing multi-machine development environment +**Time Investment:** Phased approach - MVP in weekend, polish over 2-3 weeks + +## Why This Project + +**Interview Alignment:** +- Demonstrates gRPC-based microservices architecture (core requirement) +- Shows streaming patterns (server-side and bidirectional) +- Real-time data aggregation and processing +- Alert/threshold monitoring (maps to fraud detection) +- Event-driven patterns +- Multiple data sources requiring normalization (maps to multiple payment processors) + +**Personal Utility:** +- Monitors existing multi-machine dev setup +- Dashboard stays open, provides real value +- Solves actual pain point +- Will continue running post-interview + +**Domain Mapping for Interview:** +- Machine = Payment Processor +- Metrics Stream = Transaction Stream +- Resource Thresholds = Fraud/Limit Detection +- Alert System = Risk Management +- Aggregation Service = Payment Processing Hub + +## Technical Stack + +### Core Technologies (Must Use - From JD) +- **Python 3.11+** - Primary language +- **FastAPI** - Web gateway, REST endpoints, WebSocket streaming +- **gRPC** - Inter-service communication, metric streaming +- **PostgreSQL/TimescaleDB** - Time-series historical data +- **Redis** - Current state, caching, alert rules +- **Docker Compose** - Orchestration + +### Supporting Technologies +- **Protocol Buffers** - gRPC message definitions +- **WebSockets** - Browser streaming +- **htmx + Alpine.js** - Lightweight reactive frontend (avoid heavy SPA) +- **Chart.js or Apache ECharts** - Real-time graphs +- **asyncio** - Async patterns throughout + +### Development Tools +- **grpcio & grpcio-tools** - Python gRPC +- **psutil** - System metrics collection +- **uvicorn** - FastAPI server +- **pytest** - Testing +- **docker-compose** - Local orchestration + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Browser │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ Dashboard (htmx + Alpine.js + WebSockets) │ │ +│ └──────────────────────────────────────────────────────┘ │ +└────────────────────────┬────────────────────────────────────┘ + │ WebSocket + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Web Gateway Service │ +│ (FastAPI + WebSockets) │ +│ - Serves dashboard │ +│ - Streams updates to browser │ +│ - REST API for historical queries │ +└────────────────────────┬────────────────────────────────────┘ + │ gRPC + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Aggregator Service (gRPC) │ +│ - Receives metric streams from all collectors │ +│ - Normalizes data from different sources │ +│ - Enriches with machine context │ +│ - Publishes to event stream │ +│ - Checks alert thresholds │ +└─────┬───────────────────────────────────┬───────────────────┘ + │ │ + │ Stores │ Publishes events + ▼ ▼ +┌──────────────┐ ┌────────────────┐ +│ TimescaleDB │ │ Event Stream │ +│ (historical)│ │ (Redis Pub/Sub│ +└──────────────┘ │ or RabbitMQ) │ + └────────┬───────┘ +┌──────────────┐ │ +│ Redis │ │ Subscribes +│ (current │◄───────────────────────────┘ +│ state) │ │ +└──────────────┘ ▼ + ┌────────────────┐ + ▲ │ Alert Service │ + │ │ - Processes │ + │ │ events │ + │ gRPC Streaming │ - Triggers │ + │ │ actions │ +┌─────┴────────────────────────────┴────────────────┘ +│ +│ Multiple Collector Services (one per machine) +│ ┌───────────────────────────────────────┐ +│ │ Metrics Collector (gRPC Client) │ +│ │ - Gathers system metrics (psutil) │ +│ │ - Streams to Aggregator via gRPC │ +│ │ - CPU, Memory, Disk, Network │ +│ │ - Process list │ +│ │ - Docker container stats (optional) │ +│ └───────────────────────────────────────┘ +│ +└──► Machine 1, Machine 2, Machine 3, ... +``` + +## Implementation Phases + +### Phase 1: MVP - Core Streaming (Weekend - 8-12 hours) + +**Goal:** Prove the gRPC streaming works end-to-end + +**Deliverables:** +1. Metrics Collector Service (gRPC client) + - Collects CPU, memory, disk on localhost + - Streams to aggregator every 5 seconds + +2. Aggregator Service (gRPC server) + - Receives metric stream + - Stores current state in Redis + - Logs to console + +3. Proto definitions for metric messages + +4. Docker Compose setup + +**Success Criteria:** +- Run collector, see metrics flowing to aggregator +- Redis contains current state +- Can query Redis manually for latest metrics + +### Phase 2: Web Dashboard (1 week) + +**Goal:** Make it visible and useful + +**Deliverables:** +1. Web Gateway Service (FastAPI) + - WebSocket endpoint for streaming + - REST endpoints for current/historical data + +2. Dashboard UI + - Real-time CPU/Memory graphs per machine + - Current state table + - Simple, clean design + +3. WebSocket bridge (Gateway ↔ Aggregator) + +4. TimescaleDB integration + - Store historical metrics + - Query endpoints for time ranges + +**Success Criteria:** +- Open dashboard, see live graphs updating +- Graphs show last hour of data +- Multiple machines displayed separately + +### Phase 3: Alerts & Intelligence (1 week) + +**Goal:** Add decision-making layer (interview focus) + +**Deliverables:** +1. Alert Service + - Subscribes to event stream + - Evaluates threshold rules + - Triggers notifications + +2. Configuration Service (gRPC) + - Dynamic threshold management + - Alert rule CRUD + - Stored in PostgreSQL + +3. Event Stream implementation (Redis Pub/Sub or RabbitMQ) + +4. Enhanced dashboard + - Alert indicators + - Alert history + - Threshold configuration UI + +**Success Criteria:** +- Set CPU threshold at 80% +- Generate load (stress-ng) +- See alert trigger in dashboard +- Alert logged to database + +### Phase 4: Interview Polish (Final week) + +**Goal:** Demo-ready, production patterns visible + +**Deliverables:** +1. Observability + - OpenTelemetry tracing (optional) + - Structured logging + - Health check endpoints + +2. "Synthetic Transactions" + - Simulate business operations through system + - Track end-to-end latency + - Maps directly to payment processing demo + +3. Documentation + - Architecture diagram + - Service interaction flows + - Deployment guide + +4. Demo script + - Story to walk through + - Key talking points + - Domain mapping explanations + +**Success Criteria:** +- Can deploy entire stack with one command +- Can explain every service's role +- Can map architecture to payment processing +- Demo runs smoothly without hiccups + +## Key Technical Patterns to Demonstrate + +### 1. gRPC Streaming Patterns + +**Server-Side Streaming:** +```python +# Collector streams metrics to aggregator +service MetricsService { + rpc StreamMetrics(MetricsRequest) returns (stream Metric) {} +} +``` + +**Bidirectional Streaming:** +```python +# Two-way communication between services +service ControlService { + rpc ManageStream(stream Command) returns (stream Response) {} +} +``` + +### 2. Service Communication Patterns + +- **Synchronous (gRPC):** Query current state, configuration +- **Asynchronous (Events):** Metric updates, alerts, audit logs +- **Streaming (gRPC + WebSocket):** Real-time data flow + +### 3. Data Storage Patterns + +- **Hot data (Redis):** Current state, recent metrics (last 5 minutes) +- **Warm data (TimescaleDB):** Historical metrics (last 30 days) +- **Cold data (Optional):** Archive to S3-compatible storage + +### 4. Error Handling & Resilience + +- gRPC retry logic with exponential backoff +- Circuit breaker pattern for service calls +- Graceful degradation (continue if one collector fails) +- Dead letter queue for failed events + +## Proto Definitions (Starting Point) + +```protobuf +syntax = "proto3"; + +package monitoring; + +service MetricsService { + rpc StreamMetrics(MetricsRequest) returns (stream Metric) {} + rpc GetCurrentState(StateRequest) returns (MachineState) {} +} + +message MetricsRequest { + string machine_id = 1; + int32 interval_seconds = 2; +} + +message Metric { + string machine_id = 1; + int64 timestamp = 2; + MetricType type = 3; + double value = 4; + map labels = 5; +} + +enum MetricType { + CPU_PERCENT = 0; + MEMORY_PERCENT = 1; + MEMORY_USED_GB = 2; + DISK_PERCENT = 3; + NETWORK_SENT_MBPS = 4; + NETWORK_RECV_MBPS = 5; +} + +message MachineState { + string machine_id = 1; + int64 last_seen = 2; + repeated Metric current_metrics = 3; + HealthStatus health = 4; +} + +enum HealthStatus { + HEALTHY = 0; + WARNING = 1; + CRITICAL = 2; + UNKNOWN = 3; +} +``` + +## Project Structure + +``` +system-monitor/ +├── docker-compose.yml +├── proto/ +│ └── metrics.proto +├── services/ +│ ├── collector/ +│ │ ├── Dockerfile +│ │ ├── requirements.txt +│ │ ├── main.py +│ │ └── metrics.py +│ ├── aggregator/ +│ │ ├── Dockerfile +│ │ ├── requirements.txt +│ │ ├── main.py +│ │ └── storage.py +│ ├── gateway/ +│ │ ├── Dockerfile +│ │ ├── requirements.txt +│ │ ├── main.py +│ │ └── websocket.py +│ └── alerts/ +│ ├── Dockerfile +│ ├── requirements.txt +│ ├── main.py +│ └── rules.py +├── web/ +│ ├── static/ +│ │ ├── css/ +│ │ └── js/ +│ └── templates/ +│ └── dashboard.html +└── README.md +``` + +## Interview Talking Points + +### Domain Mapping to Payments + +**What you say:** +- "I built this to monitor my dev machines, but the architecture directly maps to payment processing" +- "Each machine streaming metrics is like a payment processor streaming transactions" +- "The aggregator normalizes data from different sources - same as aggregating from Stripe, PayPal, bank APIs" +- "Alert thresholds on resource usage are structurally identical to fraud detection thresholds" +- "The event stream for audit trails maps directly to payment audit logs" + +### Technical Decisions to Highlight + +**gRPC vs REST:** +- "I use gRPC between services for efficiency and strong typing" +- "FastAPI gateway exposes REST/WebSocket for browser clients" +- "This pattern is common - internal gRPC, external REST" + +**Streaming vs Polling:** +- "Server-side streaming reduces network overhead" +- "Bidirectional streaming allows dynamic configuration updates" +- "WebSocket to browser maintains single connection" + +**State Management:** +- "Redis for hot data - current state, needs fast access" +- "TimescaleDB for historical analysis - optimized for time-series" +- "This tiered storage approach scales to payment transaction volumes" + +**Resilience:** +- "Each collector is independent - one failing doesn't affect others" +- "Circuit breaker prevents cascade failures" +- "Event stream decouples alert processing from metric ingestion" + +### What NOT to Say + +- Don't call it a "toy project" or "learning exercise" +- Don't apologize for running locally vs AWS +- Don't over-explain obvious things +- Don't claim it's production-ready when it's not + +### What TO Say + +- "I built this to solve a real problem I have" +- "Locally it uses PostgreSQL/Redis, in production these become Aurora/ElastiCache" +- "I focused on the architectural patterns since those transfer directly" +- "I'd keep developing this - it's genuinely useful" + +## Development Guidelines + +### Code Quality Standards +- Type hints throughout (Python 3.11+ syntax) +- Async/await patterns consistently +- Structured logging (JSON format) +- Error handling at all boundaries +- Unit tests for business logic +- Integration tests for service interactions + +### Docker Best Practices +- Multi-stage builds +- Non-root users +- Health checks +- Resource limits +- Volume mounts for development + +### Configuration Management +- Environment variables for all config +- Sensible defaults +- Config validation on startup +- No secrets in code + +## AWS Mapping (For Interview Discussion) + +**What you have → What it becomes:** +- PostgreSQL → Aurora PostgreSQL +- Redis → ElastiCache +- Docker Containers → ECS/Fargate or Lambda +- RabbitMQ/Redis Pub/Sub → SQS/SNS +- Docker Compose → CloudFormation/Terraform +- Local networking → VPC, Security Groups + +**Key point:** "The architecture and patterns are production-ready, the infrastructure is local for development convenience" + +## Common Pitfalls to Avoid + +1. **Over-engineering Phase 1** - Resist adding features, just get streaming working +2. **Ugly UI** - Don't waste time on design, htmx + basic CSS is fine +3. **Perfect metrics** - Mock data is OK early on, real psutil data comes later +4. **Complete coverage** - Better to have 3 services working perfectly than 10 half-done +5. **AWS deployment** - Local is fine, AWS costs money and adds complexity + +## Success Metrics + +**For Yourself:** +- [ ] Actually use the dashboard daily +- [ ] Catches a real issue before you notice +- [ ] Runs stable for 1+ week without intervention + +**For Interview:** +- [ ] Can demo end-to-end in 5 minutes +- [ ] Can explain every service interaction +- [ ] Can map to payment domain fluently +- [ ] Shows understanding of production patterns + +## Next Steps + +1. Set up project structure +2. Define proto messages +3. Build Phase 1 MVP +4. Iterate based on what feels useful +5. Polish for demo when interview approaches + +## Resources + +- gRPC Python docs: https://grpc.io/docs/languages/python/ +- FastAPI WebSockets: https://fastapi.tiangolo.com/advanced/websockets/ +- TimescaleDB: https://docs.timescale.com/ +- htmx: https://htmx.org/ + +## Questions to Ask Yourself During Development + +- "Would I actually use this feature?" +- "How does this map to payments?" +- "Can I explain why I built it this way?" +- "What would break if X service failed?" +- "How would this scale to 1000 machines?" + +--- + +## Final Note + +This project works because it's: +1. **Real** - You'll use it +2. **Focused** - Shows specific patterns they care about +3. **Mappable** - Clear connection to their domain +4. **Yours** - Not a tutorial copy, demonstrates your thinking + +Build it in phases, use it daily, and by interview time you'll have natural stories about trade-offs, failures, and learnings. That authenticity is more valuable than perfect code. + +Good luck! 🚀 diff --git a/Tiltfile b/Tiltfile new file mode 100644 index 0000000..5e70c3f --- /dev/null +++ b/Tiltfile @@ -0,0 +1,119 @@ +# -*- mode: Python -*- +# Tiltfile for sysmonstm - local Kubernetes development + +# Load extensions +load('ext://restart_process', 'docker_build_with_restart') +load('ext://namespace', 'namespace_create') + +# Configuration +config.define_bool("no-volumes") +cfg = config.parse() +no_volumes = cfg.get("no-volumes", False) + +# Create namespace +namespace_create('sysmonstm') +k8s_yaml(kustomize('k8s/overlays/local')) + +# ============================================================================ +# Docker builds with live reload +# ============================================================================ + +# Aggregator service +docker_build( + 'sysmonstm-aggregator', + context='.', + dockerfile='services/aggregator/Dockerfile', + target='development', + live_update=[ + sync('./services/aggregator', '/app/services/aggregator'), + sync('./shared', '/app/shared'), + sync('./proto', '/app/proto'), + ], +) + +# Gateway service +docker_build( + 'sysmonstm-gateway', + context='.', + dockerfile='services/gateway/Dockerfile', + target='development', + live_update=[ + sync('./services/gateway', '/app/services/gateway'), + sync('./shared', '/app/shared'), + sync('./proto', '/app/proto'), + sync('./web', '/app/web'), + ], +) + +# Alerts service +docker_build( + 'sysmonstm-alerts', + context='.', + dockerfile='services/alerts/Dockerfile', + target='development', + live_update=[ + sync('./services/alerts', '/app/services/alerts'), + sync('./shared', '/app/shared'), + ], +) + +# ============================================================================ +# Resource configuration +# ============================================================================ + +# Infrastructure +k8s_resource('redis', labels=['infra']) +k8s_resource('timescaledb', labels=['infra']) + +# Application services +k8s_resource( + 'aggregator', + labels=['app'], + resource_deps=['redis', 'timescaledb'], + port_forwards=['50051:50051'], +) + +k8s_resource( + 'gateway', + labels=['app'], + resource_deps=['aggregator', 'redis'], + port_forwards=['8000:8000'], +) + +k8s_resource( + 'alerts', + labels=['app'], + resource_deps=['redis', 'timescaledb'], +) + +# ============================================================================ +# Local resources (optional - for running collector locally) +# ============================================================================ + +local_resource( + 'collector-local', + serve_cmd='cd services/collector && python main.py', + deps=['services/collector', 'shared'], + resource_deps=['aggregator'], + labels=['collector'], + auto_init=False, # Don't start automatically + env={ + 'AGGREGATOR_URL': 'localhost:50051', + 'MACHINE_ID': 'tilt-dev', + 'COLLECTION_INTERVAL': '5', + 'LOG_LEVEL': 'DEBUG', + 'PYTHONPATH': '.', + }, +) + +# ============================================================================ +# Convenience buttons +# ============================================================================ + +local_resource( + 'proto-gen', + cmd='python -m grpc_tools.protoc -I./proto --python_out=./shared --grpc_python_out=./shared ./proto/metrics.proto', + deps=['proto/metrics.proto'], + labels=['tools'], + auto_init=False, +) diff --git a/ctlptl.yaml b/ctlptl.yaml new file mode 100644 index 0000000..b81ba07 --- /dev/null +++ b/ctlptl.yaml @@ -0,0 +1,32 @@ +# ctlptl configuration for Kind cluster +# Usage: ctlptl apply -f ctlptl.yaml + +apiVersion: ctlptl.dev/v1alpha1 +kind: Registry +name: sysmonstm-registry +port: 5005 +--- +apiVersion: ctlptl.dev/v1alpha1 +kind: Cluster +product: kind +registry: sysmonstm-registry +kindV1Alpha4Cluster: + name: sysmonstm + nodes: + - role: control-plane + extraPortMappings: + # Gateway HTTP + - containerPort: 30080 + hostPort: 8080 + protocol: TCP + # Aggregator gRPC + - containerPort: 30051 + hostPort: 50051 + protocol: TCP + # Resource limits for t2.small compatibility + kubeadmConfigPatches: + - | + kind: InitConfiguration + nodeRegistration: + kubeletExtraArgs: + system-reserved: memory=256Mi diff --git a/docker-compose.override.yml b/docker-compose.override.yml new file mode 100644 index 0000000..7a0513a --- /dev/null +++ b/docker-compose.override.yml @@ -0,0 +1,48 @@ +# Development overrides - hot reload, mounted volumes, debug settings +# Usage: docker compose up (automatically includes this file) + +version: "3.8" + +services: + aggregator: + build: + target: development + volumes: + - ./services/aggregator:/app/services/aggregator:ro + - ./shared:/app/shared:ro + - ./proto:/app/proto:ro + environment: + LOG_LEVEL: DEBUG + RELOAD: "true" + + gateway: + build: + target: development + volumes: + - ./services/gateway:/app/services/gateway:ro + - ./shared:/app/shared:ro + - ./proto:/app/proto:ro + - ./web:/app/web:ro + environment: + LOG_LEVEL: DEBUG + RELOAD: "true" + + alerts: + build: + target: development + volumes: + - ./services/alerts:/app/services/alerts:ro + - ./shared:/app/shared:ro + environment: + LOG_LEVEL: DEBUG + + collector: + build: + target: development + volumes: + - ./services/collector:/app/services/collector:ro + - ./shared:/app/shared:ro + - ./proto:/app/proto:ro + environment: + LOG_LEVEL: DEBUG + COLLECTION_INTERVAL: 2 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..efc32da --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,154 @@ +version: "3.8" + +# This file works both locally and on EC2 for demo purposes. +# For local dev with hot-reload, use: docker compose -f docker-compose.yml -f docker-compose.override.yml up + +x-common-env: &common-env + REDIS_URL: redis://redis:6379 + TIMESCALE_URL: postgresql://monitor:monitor@timescaledb:5432/monitor + EVENTS_BACKEND: redis_pubsub + LOG_LEVEL: ${LOG_LEVEL:-INFO} + LOG_FORMAT: json + +x-healthcheck-defaults: &healthcheck-defaults + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s + +services: + # ============================================================================= + # Infrastructure + # ============================================================================= + + redis: + image: redis:7-alpine + ports: + - "${REDIS_PORT:-6379}:6379" + volumes: + - redis-data:/data + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "redis-cli", "ping"] + deploy: + resources: + limits: + memory: 128M + + timescaledb: + image: timescale/timescaledb:latest-pg15 + environment: + POSTGRES_USER: monitor + POSTGRES_PASSWORD: monitor + POSTGRES_DB: monitor + ports: + - "${TIMESCALE_PORT:-5432}:5432" + volumes: + - timescale-data:/var/lib/postgresql/data + - ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro + healthcheck: + <<: *healthcheck-defaults + test: ["CMD-SHELL", "pg_isready -U monitor -d monitor"] + deploy: + resources: + limits: + memory: 512M + + # ============================================================================= + # Application Services + # ============================================================================= + + aggregator: + build: + context: . + dockerfile: services/aggregator/Dockerfile + environment: + <<: *common-env + GRPC_PORT: 50051 + SERVICE_NAME: aggregator + ports: + - "${AGGREGATOR_GRPC_PORT:-50051}:50051" + depends_on: + redis: + condition: service_healthy + timescaledb: + condition: service_healthy + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "/bin/grpc_health_probe", "-addr=:50051"] + deploy: + resources: + limits: + memory: 256M + + gateway: + build: + context: . + dockerfile: services/gateway/Dockerfile + environment: + <<: *common-env + HTTP_PORT: 8000 + AGGREGATOR_URL: aggregator:50051 + SERVICE_NAME: gateway + ports: + - "${GATEWAY_PORT:-8000}:8000" + depends_on: + - aggregator + - redis + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + deploy: + resources: + limits: + memory: 256M + + alerts: + build: + context: . + dockerfile: services/alerts/Dockerfile + environment: + <<: *common-env + SERVICE_NAME: alerts + depends_on: + redis: + condition: service_healthy + timescaledb: + condition: service_healthy + healthcheck: + <<: *healthcheck-defaults + test: ["CMD", "python", "-c", "import sys; sys.exit(0)"] + deploy: + resources: + limits: + memory: 128M + + # Collector runs separately on each machine being monitored + # For local testing, we run one instance + collector: + build: + context: . + dockerfile: services/collector/Dockerfile + environment: + <<: *common-env + AGGREGATOR_URL: aggregator:50051 + MACHINE_ID: ${MACHINE_ID:-local-dev} + COLLECTION_INTERVAL: ${COLLECTION_INTERVAL:-5} + SERVICE_NAME: collector + depends_on: + - aggregator + deploy: + resources: + limits: + memory: 64M + # For actual system metrics, you might need: + # privileged: true + # pid: host + +volumes: + redis-data: + timescale-data: + +networks: + default: + name: sysmonstm diff --git a/docs/architecture/01-system-overview.dot b/docs/architecture/01-system-overview.dot new file mode 100644 index 0000000..c9bc4b0 --- /dev/null +++ b/docs/architecture/01-system-overview.dot @@ -0,0 +1,78 @@ +digraph SystemOverview { + // Graph settings + rankdir=TB; + compound=true; + fontname="Helvetica"; + node [fontname="Helvetica", fontsize=11]; + edge [fontname="Helvetica", fontsize=10]; + + // Title + labelloc="t"; + label="System Monitoring Platform - Architecture Overview"; + fontsize=16; + + // Styling + node [shape=box, style="rounded,filled"]; + + // External + subgraph cluster_external { + label="External"; + style=dashed; + color=gray; + + browser [label="Browser\n(Dashboard)", fillcolor="#E3F2FD"]; + machines [label="Monitored\nMachines", fillcolor="#FFF3E0", shape=box3d]; + } + + // Core Services + subgraph cluster_services { + label="Application Services"; + style=filled; + color="#E8F5E9"; + fillcolor="#E8F5E9"; + + gateway [label="Gateway\n(FastAPI)", fillcolor="#C8E6C9"]; + aggregator [label="Aggregator\n(gRPC Server)", fillcolor="#C8E6C9"]; + alerts [label="Alerts\nService", fillcolor="#C8E6C9"]; + collector [label="Collector\n(gRPC Client)", fillcolor="#DCEDC8"]; + } + + // Data Layer + subgraph cluster_data { + label="Data Layer"; + style=filled; + color="#FFF8E1"; + fillcolor="#FFF8E1"; + + redis [label="Redis\n(Pub/Sub + State)", fillcolor="#FFECB3", shape=cylinder]; + timescale [label="TimescaleDB\n(Time-series)", fillcolor="#FFECB3", shape=cylinder]; + } + + // Event Stream + subgraph cluster_events { + label="Event Stream"; + style=filled; + color="#F3E5F5"; + fillcolor="#F3E5F5"; + + events [label="Redis Pub/Sub\n(Events)", fillcolor="#E1BEE7", shape=hexagon]; + } + + // Connections + browser -> gateway [label="WebSocket\nREST", color="#1976D2"]; + gateway -> aggregator [label="gRPC", color="#388E3C"]; + gateway -> redis [label="State\nQuery", style=dashed]; + gateway -> timescale [label="Historical\nQuery", style=dashed]; + + machines -> collector [label="psutil", color="#F57C00", style=dotted]; + collector -> aggregator [label="gRPC\nStream", color="#388E3C"]; + + aggregator -> redis [label="Current\nState", color="#FFA000"]; + aggregator -> timescale [label="Store\nMetrics", color="#FFA000"]; + aggregator -> events [label="Publish", color="#7B1FA2"]; + + events -> alerts [label="Subscribe", color="#7B1FA2"]; + events -> gateway [label="Subscribe", color="#7B1FA2"]; + + alerts -> timescale [label="Store\nAlerts", style=dashed]; +} diff --git a/docs/architecture/01-system-overview.svg b/docs/architecture/01-system-overview.svg new file mode 100644 index 0000000..aefe71c --- /dev/null +++ b/docs/architecture/01-system-overview.svg @@ -0,0 +1,193 @@ + + + + + + +SystemOverview + +System Monitoring Platform - Architecture Overview + +cluster_external + +External + + +cluster_services + +Application Services + + +cluster_data + +Data Layer + + +cluster_events + +Event Stream + + + +browser + +Browser +(Dashboard) + + + +gateway + +Gateway +(FastAPI) + + + +browser->gateway + + +WebSocket +REST + + + +machines + + + + +Monitored +Machines + + + +collector + +Collector +(gRPC Client) + + + +machines->collector + + +psutil + + + +aggregator + +Aggregator +(gRPC Server) + + + +gateway->aggregator + + +gRPC + + + +redis + + +Redis +(Pub/Sub + State) + + + +gateway->redis + + +State +Query + + + +timescale + + +TimescaleDB +(Time-series) + + + +gateway->timescale + + +Historical +Query + + + +aggregator->redis + + +Current +State + + + +aggregator->timescale + + +Store +Metrics + + + +events + +Redis Pub/Sub +(Events) + + + +aggregator->events + + +Publish + + + +alerts + +Alerts +Service + + + +alerts->timescale + + +Store +Alerts + + + +collector->aggregator + + +gRPC +Stream + + + +events->gateway + + +Subscribe + + + +events->alerts + + +Subscribe + + + diff --git a/docs/architecture/02-data-flow.dot b/docs/architecture/02-data-flow.dot new file mode 100644 index 0000000..ac77851 --- /dev/null +++ b/docs/architecture/02-data-flow.dot @@ -0,0 +1,83 @@ +digraph DataFlow { + rankdir=LR; + compound=true; + fontname="Helvetica"; + node [fontname="Helvetica", fontsize=10]; + edge [fontname="Helvetica", fontsize=9]; + + labelloc="t"; + label="Metrics Data Flow Pipeline"; + fontsize=14; + + node [shape=box, style="rounded,filled"]; + + // Collection + subgraph cluster_collect { + label="Collection (5s)"; + style=filled; + fillcolor="#E3F2FD"; + + psutil [label="psutil\n(CPU, Mem, Disk)", shape=component, fillcolor="#BBDEFB"]; + collector [label="Collector\nService", fillcolor="#90CAF9"]; + } + + // Ingestion + subgraph cluster_ingest { + label="Ingestion"; + style=filled; + fillcolor="#E8F5E9"; + + aggregator [label="Aggregator\n(gRPC)", fillcolor="#A5D6A7"]; + validate [label="Validate &\nNormalize", shape=diamond, fillcolor="#C8E6C9"]; + } + + // Storage Hot + subgraph cluster_hot { + label="Hot Path (Real-time)"; + style=filled; + fillcolor="#FFF3E0"; + + redis_state [label="Redis\nCurrent State", shape=cylinder, fillcolor="#FFCC80"]; + redis_pubsub [label="Redis\nPub/Sub", shape=hexagon, fillcolor="#FFB74D"]; + } + + // Storage Warm + subgraph cluster_warm { + label="Warm Path (Historical)"; + style=filled; + fillcolor="#FCE4EC"; + + raw [label="metrics_raw\n(5s, 24h)", shape=cylinder, fillcolor="#F8BBD9"]; + agg_1m [label="metrics_1m\n(1m, 7d)", shape=cylinder, fillcolor="#F48FB1"]; + agg_1h [label="metrics_1h\n(1h, 90d)", shape=cylinder, fillcolor="#EC407A"]; + } + + // Consumers + subgraph cluster_consume { + label="Consumers"; + style=filled; + fillcolor="#E8EAF6"; + + alerts [label="Alert\nService", fillcolor="#C5CAE9"]; + gateway [label="Gateway\n(WebSocket)", fillcolor="#9FA8DA"]; + lambda [label="Lambda\nAggregator", fillcolor="#7986CB", style="rounded,filled,dashed"]; + } + + // Flow + psutil -> collector [label="Metrics"]; + collector -> aggregator [label="gRPC\nStream"]; + aggregator -> validate; + + validate -> redis_state [label="Upsert"]; + validate -> redis_pubsub [label="Publish"]; + validate -> raw [label="Insert"]; + + redis_pubsub -> alerts [label="metrics.*"]; + redis_pubsub -> gateway [label="metrics.*"]; + + raw -> agg_1m [label="Continuous\nAggregate", style=dashed]; + agg_1m -> agg_1h [label="Hourly\nJob", style=dashed]; + + raw -> lambda [label="SQS\nTrigger", style=dotted]; + lambda -> agg_1m [label="Batch\nWrite", style=dotted]; +} diff --git a/docs/architecture/02-data-flow.svg b/docs/architecture/02-data-flow.svg new file mode 100644 index 0000000..5735a45 --- /dev/null +++ b/docs/architecture/02-data-flow.svg @@ -0,0 +1,217 @@ + + + + + + +DataFlow + +Metrics Data Flow Pipeline + +cluster_collect + +Collection (5s) + + +cluster_ingest + +Ingestion + + +cluster_hot + +Hot Path (Real-time) + + +cluster_warm + +Warm Path (Historical) + + +cluster_consume + +Consumers + + + +psutil + + + +psutil +(CPU, Mem, Disk) + + + +collector + +Collector +Service + + + +psutil->collector + + +Metrics + + + +aggregator + +Aggregator +(gRPC) + + + +collector->aggregator + + +gRPC +Stream + + + +validate + +Validate & +Normalize + + + +aggregator->validate + + + + + +redis_state + + +Redis +Current State + + + +validate->redis_state + + +Upsert + + + +redis_pubsub + +Redis +Pub/Sub + + + +validate->redis_pubsub + + +Publish + + + +raw + + +metrics_raw +(5s, 24h) + + + +validate->raw + + +Insert + + + +alerts + +Alert +Service + + + +redis_pubsub->alerts + + +metrics.* + + + +gateway + +Gateway +(WebSocket) + + + +redis_pubsub->gateway + + +metrics.* + + + +agg_1m + + +metrics_1m +(1m, 7d) + + + +raw->agg_1m + + +Continuous +Aggregate + + + +lambda + +Lambda +Aggregator + + + +raw->lambda + + +SQS +Trigger + + + +agg_1h + + +metrics_1h +(1h, 90d) + + + +agg_1m->agg_1h + + +Hourly +Job + + + +lambda->agg_1m + + +Batch +Write + + + diff --git a/docs/architecture/03-deployment.dot b/docs/architecture/03-deployment.dot new file mode 100644 index 0000000..fe3b29d --- /dev/null +++ b/docs/architecture/03-deployment.dot @@ -0,0 +1,95 @@ +digraph Deployment { + rankdir=TB; + compound=true; + fontname="Helvetica"; + node [fontname="Helvetica", fontsize=10]; + edge [fontname="Helvetica", fontsize=9]; + + labelloc="t"; + label="Deployment Architecture"; + fontsize=14; + + node [shape=box, style="rounded,filled"]; + + // Local Development + subgraph cluster_local { + label="Local Development"; + style=filled; + fillcolor="#E3F2FD"; + + subgraph cluster_kind { + label="Kind Cluster"; + style=filled; + fillcolor="#BBDEFB"; + + tilt [label="Tilt\n(Live Reload)", shape=component, fillcolor="#90CAF9"]; + k8s_local [label="K8s Pods\n(via Kustomize)", fillcolor="#64B5F6"]; + } + + compose [label="Docker Compose\n(Alternative)", fillcolor="#90CAF9", style="rounded,dashed"]; + } + + // AWS Staging/Demo + subgraph cluster_aws { + label="AWS (sysmonstm.mcrn.ar)"; + style=filled; + fillcolor="#E8F5E9"; + + subgraph cluster_ec2 { + label="EC2 t2.small"; + style=filled; + fillcolor="#C8E6C9"; + + compose_ec2 [label="Docker Compose\n(All Services)", fillcolor="#A5D6A7"]; + nginx [label="Nginx\n(SSL Termination)", fillcolor="#81C784"]; + } + + subgraph cluster_lambda { + label="Lambda (Data Processing)"; + style=filled; + fillcolor="#DCEDC8"; + + lambda_agg [label="Aggregator\nLambda", fillcolor="#AED581"]; + lambda_compact [label="Compactor\nLambda", fillcolor="#9CCC65"]; + } + + sqs [label="SQS\n(Buffer)", shape=hexagon, fillcolor="#FFE082"]; + s3 [label="S3\n(Backup)", shape=cylinder, fillcolor="#FFE082"]; + } + + // CI/CD + subgraph cluster_cicd { + label="CI/CD"; + style=filled; + fillcolor="#F3E5F5"; + + woodpecker [label="Woodpecker CI", fillcolor="#CE93D8"]; + registry [label="Container\nRegistry", shape=cylinder, fillcolor="#BA68C8"]; + } + + // Collectors (External) + subgraph cluster_collectors { + label="Monitored Machines"; + style=dashed; + color=gray; + + coll1 [label="Collector\n(Machine 1)", fillcolor="#FFCCBC"]; + coll2 [label="Collector\n(Machine 2)", fillcolor="#FFCCBC"]; + coll3 [label="Collector\n(Machine N)", fillcolor="#FFCCBC"]; + } + + // Connections + tilt -> k8s_local [style=invis]; + woodpecker -> registry [label="Push"]; + registry -> compose_ec2 [label="Pull"]; + registry -> k8s_local [label="Pull", style=dashed]; + + nginx -> compose_ec2 [label="Proxy"]; + compose_ec2 -> sqs [label="Events"]; + sqs -> lambda_agg [label="Trigger"]; + lambda_compact -> s3 [label="Archive"]; + + coll1 -> compose_ec2 [label="gRPC", lhead=cluster_ec2]; + coll2 -> compose_ec2 [label="gRPC", lhead=cluster_ec2]; + coll3 -> compose_ec2 [label="gRPC", lhead=cluster_ec2]; +} diff --git a/docs/architecture/03-deployment.svg b/docs/architecture/03-deployment.svg new file mode 100644 index 0000000..cc1cf45 --- /dev/null +++ b/docs/architecture/03-deployment.svg @@ -0,0 +1,221 @@ + + + + + + +Deployment + +Deployment Architecture + +cluster_local + +Local Development + + +cluster_kind + +Kind Cluster + + +cluster_aws + +AWS (sysmonstm.mcrn.ar) + + +cluster_ec2 + +EC2 t2.small + + +cluster_lambda + +Lambda (Data Processing) + + +cluster_cicd + +CI/CD + + +cluster_collectors + +Monitored Machines + + + +tilt + + + +Tilt +(Live Reload) + + + +k8s_local + +K8s Pods +(via Kustomize) + + + + +compose + +Docker Compose +(Alternative) + + + +compose_ec2 + +Docker Compose +(All Services) + + + +sqs + +SQS +(Buffer) + + + +compose_ec2->sqs + + +Events + + + +nginx + +Nginx +(SSL Termination) + + + +nginx->compose_ec2 + + +Proxy + + + +lambda_agg + +Aggregator +Lambda + + + +lambda_compact + +Compactor +Lambda + + + +s3 + + +S3 +(Backup) + + + +lambda_compact->s3 + + +Archive + + + +sqs->lambda_agg + + +Trigger + + + +woodpecker + +Woodpecker CI + + + +registry + + +Container +Registry + + + +woodpecker->registry + + +Push + + + +registry->k8s_local + + +Pull + + + +registry->compose_ec2 + + +Pull + + + +coll1 + +Collector +(Machine 1) + + + +coll1->compose_ec2 + + +gRPC + + + +coll2 + +Collector +(Machine 2) + + + +coll2->compose_ec2 + + +gRPC + + + +coll3 + +Collector +(Machine N) + + + +coll3->compose_ec2 + + +gRPC + + + diff --git a/docs/architecture/04-grpc-services.dot b/docs/architecture/04-grpc-services.dot new file mode 100644 index 0000000..9b06929 --- /dev/null +++ b/docs/architecture/04-grpc-services.dot @@ -0,0 +1,67 @@ +digraph GrpcServices { + rankdir=LR; + compound=true; + fontname="Helvetica"; + node [fontname="Helvetica", fontsize=10]; + edge [fontname="Helvetica", fontsize=9]; + + labelloc="t"; + label="gRPC Service Definitions"; + fontsize=14; + + node [shape=record, style=filled]; + + // MetricsService + subgraph cluster_metrics { + label="MetricsService"; + style=filled; + fillcolor="#E8F5E9"; + + metrics_svc [label="{MetricsService|+ StreamMetrics(stream Metric) → StreamAck\l+ GetCurrentState(StateRequest) → MachineState\l+ GetAllStates(Empty) → AllMachinesState\l}", fillcolor="#C8E6C9"]; + + metric_msg [label="{Metric|machine_id: string\lhostname: string\ltimestamp_ms: int64\ltype: MetricType\lvalue: double\llabels: map\l}", fillcolor="#A5D6A7"]; + + machine_state [label="{MachineState|machine_id: string\lhostname: string\llast_seen_ms: int64\lcurrent_metrics: Metric[]\lhealth: HealthStatus\lmetadata: map\l}", fillcolor="#A5D6A7"]; + } + + // ControlService + subgraph cluster_control { + label="ControlService"; + style=filled; + fillcolor="#E3F2FD"; + + control_svc [label="{ControlService|+ Control(stream Command) → stream Response\l}", fillcolor="#90CAF9"]; + + commands [label="{ControlCommand|command_id: string\l|UpdateIntervalCommand\lRestartCollectionCommand\lShutdownCommand\l}", fillcolor="#64B5F6"]; + } + + // ConfigService + subgraph cluster_config { + label="ConfigService"; + style=filled; + fillcolor="#FFF3E0"; + + config_svc [label="{ConfigService|+ GetConfig(ConfigRequest) → CollectorConfig\l+ WatchConfig(ConfigRequest) → stream CollectorConfig\l}", fillcolor="#FFE0B2"]; + + collector_config [label="{CollectorConfig|collection_interval_seconds: int32\lenabled_metrics: MetricType[]\llabels: map\lthresholds: ThresholdConfig[]\l}", fillcolor="#FFCC80"]; + } + + // Enums + subgraph cluster_enums { + label="Enums"; + style=filled; + fillcolor="#F3E5F5"; + + metric_type [label="{MetricType|CPU_PERCENT\lMEMORY_PERCENT\lDISK_PERCENT\lNETWORK_*\lLOAD_AVG_*\l...}", fillcolor="#E1BEE7"]; + + health_status [label="{HealthStatus|HEALTHY\lWARNING\lCRITICAL\lUNKNOWN\lOFFLINE\l}", fillcolor="#CE93D8"]; + } + + // Relationships + metrics_svc -> metric_msg [style=dashed]; + metrics_svc -> machine_state [style=dashed]; + control_svc -> commands [style=dashed]; + config_svc -> collector_config [style=dashed]; + metric_msg -> metric_type [style=dotted]; + machine_state -> health_status [style=dotted]; +} diff --git a/docs/architecture/04-grpc-services.svg b/docs/architecture/04-grpc-services.svg new file mode 100644 index 0000000..d4af478 --- /dev/null +++ b/docs/architecture/04-grpc-services.svg @@ -0,0 +1,171 @@ + + + + + + +GrpcServices + +gRPC Service Definitions + +cluster_metrics + +MetricsService + + +cluster_control + +ControlService + + +cluster_config + +ConfigService + + +cluster_enums + +Enums + + + +metrics_svc + +MetricsService + ++ StreamMetrics(stream Metric) → StreamAck ++ GetCurrentState(StateRequest) → MachineState ++ GetAllStates(Empty) → AllMachinesState + + + +metric_msg + +Metric + +machine_id: string +hostname: string +timestamp_ms: int64 +type: MetricType +value: double +labels: map + + + +metrics_svc->metric_msg + + + + + +machine_state + +MachineState + +machine_id: string +hostname: string +last_seen_ms: int64 +current_metrics: Metric[] +health: HealthStatus +metadata: map + + + +metrics_svc->machine_state + + + + + +metric_type + +MetricType + +CPU_PERCENT +MEMORY_PERCENT +DISK_PERCENT +NETWORK_* +LOAD_AVG_* +... + + + +metric_msg->metric_type + + + + + +health_status + +HealthStatus + +HEALTHY +WARNING +CRITICAL +UNKNOWN +OFFLINE + + + +machine_state->health_status + + + + + +control_svc + +ControlService + ++ Control(stream Command) → stream Response + + + +commands + +ControlCommand + +command_id: string + +UpdateIntervalCommand +RestartCollectionCommand +ShutdownCommand + + + +control_svc->commands + + + + + +config_svc + +ConfigService + ++ GetConfig(ConfigRequest) → CollectorConfig ++ WatchConfig(ConfigRequest) → stream CollectorConfig + + + +collector_config + +CollectorConfig + +collection_interval_seconds: int32 +enabled_metrics: MetricType[] +labels: map +thresholds: ThresholdConfig[] + + + +config_svc->collector_config + + + + + diff --git a/docs/architecture/graph.html b/docs/architecture/graph.html new file mode 100644 index 0000000..8edb4cf --- /dev/null +++ b/docs/architecture/graph.html @@ -0,0 +1,120 @@ + + + + + + Graph Viewer - System Monitor + + + +
+ ← Index + +

Loading...

+
+ + + + + +
+
+ +
+ Graph +
+ + + + diff --git a/docs/architecture/index.html b/docs/architecture/index.html new file mode 100644 index 0000000..bcc92dd --- /dev/null +++ b/docs/architecture/index.html @@ -0,0 +1,207 @@ + + + + + + System Monitor - Architecture Documentation + + + +
+

System Monitoring Platform

+

Architecture & Design Documentation

+
+ +
+
+
+

System Overview

+ View Full +
+ + System Overview + +
+

High-level architecture showing all services, data stores, and communication patterns.

+

Key Components

+
    +
  • Collector: Runs on each monitored machine, streams metrics via gRPC
  • +
  • Aggregator: Central gRPC server, receives streams, normalizes data
  • +
  • Gateway: FastAPI service, WebSocket for browser, REST for queries
  • +
  • Alerts: Subscribes to events, evaluates thresholds, triggers actions
  • +
+
+
+ +
+
+

Data Flow Pipeline

+ View Full +
+ + Data Flow + +
+

How metrics flow from collection through storage with different retention tiers.

+

Storage Tiers

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
TierResolutionRetentionUse Case
Hot (Redis)5s5 minCurrent state, live dashboard
Raw (TimescaleDB)5s24hRecent detailed analysis
1-min Aggregates1m7dWeek view, trends
1-hour Aggregates1h90dLong-term analysis
+
+
+ +
+
+

Deployment Architecture

+ View Full +
+ + Deployment + +
+

Deployment options from local development to AWS production.

+

Environments

+
    +
  • Local Dev: Kind + Tilt for K8s, or Docker Compose
  • +
  • Demo (EC2): Docker Compose on t2.small at sysmonstm.mcrn.ar
  • +
  • Lambda Pipeline: SQS-triggered aggregation for data processing experience
  • +
+
+
+ +
+
+

gRPC Service Definitions

+ View Full +
+ + gRPC Services + +
+

Protocol Buffer service and message definitions.

+

Services

+
    +
  • MetricsService: Client-side streaming for metrics ingestion
  • +
  • ControlService: Bidirectional streaming for collector control
  • +
  • ConfigService: Server-side streaming for config updates
  • +
+
+
+ +
+

Interview Talking Points

+
+
+

Domain Mapping

+
    +
  • Machine = Payment Processor
  • +
  • Metrics Stream = Transaction Stream
  • +
  • Thresholds = Fraud Detection
  • +
  • Aggregator = Payment Hub
  • +
+
+
+

gRPC Patterns

+
    +
  • Client streaming (metrics)
  • +
  • Server streaming (config)
  • +
  • Bidirectional (control)
  • +
  • Health checking
  • +
+
+
+

Event-Driven

+
    +
  • Redis Pub/Sub (current)
  • +
  • Abstraction for Kafka switch
  • +
  • Decoupled alert processing
  • +
  • Real-time WebSocket push
  • +
+
+
+

Resilience

+
    +
  • Collectors are independent
  • +
  • Graceful degradation
  • +
  • Retry with backoff
  • +
  • Health checks everywhere
  • +
+
+
+
+ +
+

Technology Stack

+
+
+

Core

+
    +
  • Python 3.11+
  • +
  • FastAPI
  • +
  • gRPC / protobuf
  • +
  • asyncio
  • +
+
+
+

Data

+
    +
  • TimescaleDB
  • +
  • Redis
  • +
  • Redis Pub/Sub
  • +
+
+
+

Infrastructure

+
    +
  • Docker
  • +
  • Kubernetes
  • +
  • Kind + Tilt
  • +
  • Terraform
  • +
+
+
+

CI/CD

+
    +
  • Woodpecker CI
  • +
  • Kustomize
  • +
  • Container Registry
  • +
+
+
+
+
+ +
+

System Monitoring Platform - Architecture Documentation

+

Generated:

+
+ + diff --git a/docs/architecture/styles.css b/docs/architecture/styles.css new file mode 100644 index 0000000..4f251b9 --- /dev/null +++ b/docs/architecture/styles.css @@ -0,0 +1,343 @@ +:root { + --bg-primary: #1a1a2e; + --bg-secondary: #16213e; + --bg-card: #0f3460; + --text-primary: #eee; + --text-secondary: #a0a0a0; + --accent: #e94560; + --accent-secondary: #533483; + --border: #2a2a4a; +} + +* { + box-sizing: border-box; + margin: 0; + padding: 0; +} + +body { + font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; + background: var(--bg-primary); + color: var(--text-primary); + line-height: 1.6; +} + +header { + background: linear-gradient(135deg, var(--bg-secondary), var(--accent-secondary)); + padding: 2rem; + text-align: center; + border-bottom: 2px solid var(--accent); +} + +header h1 { + font-size: 2rem; + margin-bottom: 0.5rem; +} + +header .subtitle { + color: var(--text-secondary); + font-size: 1rem; +} + +main { + max-width: 1400px; + margin: 0 auto; + padding: 2rem; +} + +/* Graph sections */ +.graph-section { + background: var(--bg-secondary); + border-radius: 8px; + padding: 1.5rem; + margin-bottom: 2rem; + border: 1px solid var(--border); +} + +.graph-header-row { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 1rem; +} + +.graph-header-row h2 { + font-size: 1.25rem; + color: var(--accent); +} + +.view-btn { + background: var(--accent); + color: white; + padding: 0.5rem 1rem; + border-radius: 4px; + text-decoration: none; + font-size: 0.875rem; + transition: opacity 0.2s; +} + +.view-btn:hover { + opacity: 0.8; +} + +.graph-preview { + display: block; + background: white; + border-radius: 4px; + padding: 1rem; + margin-bottom: 1rem; + overflow: auto; + max-height: 400px; +} + +.graph-preview img { + max-width: 100%; + height: auto; +} + +.graph-details { + color: var(--text-secondary); + font-size: 0.9rem; +} + +.graph-details h4 { + color: var(--text-primary); + margin: 1rem 0 0.5rem; +} + +.graph-details ul { + margin-left: 1.5rem; +} + +.graph-details li { + margin-bottom: 0.25rem; +} + +/* Tech section */ +.tech-section { + background: var(--bg-secondary); + border-radius: 8px; + padding: 1.5rem; + margin-bottom: 2rem; + border: 1px solid var(--border); +} + +.tech-section h2 { + color: var(--accent); + margin-bottom: 1rem; +} + +.tech-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); + gap: 1.5rem; +} + +.tech-column h3 { + color: var(--text-primary); + font-size: 1rem; + margin-bottom: 0.75rem; + padding-bottom: 0.5rem; + border-bottom: 1px solid var(--border); +} + +.tech-column ul { + list-style: none; +} + +.tech-column li { + padding: 0.25rem 0; + color: var(--text-secondary); +} + +/* Findings */ +.findings-section { + margin-bottom: 2rem; +} + +.findings-section h2 { + color: var(--accent); + margin-bottom: 1rem; +} + +.findings-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(280px, 1fr)); + gap: 1rem; +} + +.finding-card { + background: var(--bg-secondary); + border-radius: 8px; + padding: 1.25rem; + border: 1px solid var(--border); +} + +.finding-card h3 { + color: var(--accent); + font-size: 1rem; + margin-bottom: 0.75rem; +} + +.finding-card ul { + margin-left: 1rem; + color: var(--text-secondary); +} + +.finding-card code { + background: var(--bg-primary); + padding: 0.125rem 0.375rem; + border-radius: 3px; + font-size: 0.85em; +} + +/* Footer */ +footer { + text-align: center; + padding: 2rem; + color: var(--text-secondary); + border-top: 1px solid var(--border); +} + +footer .date { + font-size: 0.85rem; +} + +/* Graph viewer page */ +body.graph-viewer { + display: flex; + flex-direction: column; + height: 100vh; +} + +.graph-header { + display: flex; + align-items: center; + gap: 1rem; + padding: 0.75rem 1rem; + background: var(--bg-secondary); + border-bottom: 1px solid var(--border); + flex-wrap: wrap; +} + +.back-link { + color: var(--accent); + text-decoration: none; +} + +.nav-controls { + display: flex; + align-items: center; + gap: 0.5rem; +} + +.nav-controls button { + background: var(--bg-card); + color: var(--text-primary); + border: 1px solid var(--border); + padding: 0.25rem 0.75rem; + border-radius: 4px; + cursor: pointer; +} + +.nav-controls button:disabled { + opacity: 0.3; + cursor: not-allowed; +} + +#nav-position { + color: var(--text-secondary); + font-size: 0.85rem; +} + +.graph-header h1 { + flex: 1; + font-size: 1rem; + text-align: center; +} + +.graph-controls { + display: flex; + gap: 0.5rem; +} + +.graph-controls button { + background: var(--bg-card); + color: var(--text-primary); + border: 1px solid var(--border); + padding: 0.375rem 0.75rem; + border-radius: 4px; + cursor: pointer; + font-size: 0.85rem; +} + +.graph-controls button:hover { + background: var(--accent); +} + +.graph-container { + flex: 1; + overflow: auto; + background: white; + display: flex; + justify-content: center; + align-items: flex-start; + padding: 1rem; +} + +.graph-container.fit img { + max-width: 100%; + max-height: calc(100vh - 60px); + object-fit: contain; +} + +.graph-container.fit-width img { + width: 100%; + height: auto; +} + +.graph-container.fit-height img { + height: calc(100vh - 60px); + width: auto; +} + +.graph-container.actual-size img { + /* No constraints */ +} + +/* Tables */ +.details-table { + width: 100%; + border-collapse: collapse; + margin: 1rem 0; + font-size: 0.85rem; +} + +.details-table th, +.details-table td { + padding: 0.5rem; + text-align: left; + border-bottom: 1px solid var(--border); +} + +.details-table th { + color: var(--text-primary); + background: var(--bg-primary); +} + +.details-table td { + color: var(--text-secondary); +} + +.details-table code { + background: var(--bg-primary); + padding: 0.125rem 0.375rem; + border-radius: 3px; +} + +.note { + font-style: italic; + font-size: 0.85rem; + color: var(--text-secondary); + margin-top: 0.5rem; +} diff --git a/infra/aws/lambdas/aggregator/placeholder.txt b/infra/aws/lambdas/aggregator/placeholder.txt new file mode 100644 index 0000000..48cdce8 --- /dev/null +++ b/infra/aws/lambdas/aggregator/placeholder.txt @@ -0,0 +1 @@ +placeholder diff --git a/infra/aws/lambdas/aggregator/placeholder.zip b/infra/aws/lambdas/aggregator/placeholder.zip new file mode 100644 index 0000000000000000000000000000000000000000..a26ef866da7506d19df44123c54d9956c8ffc25e GIT binary patch literal 192 zcmWIWW@h1H00Fj~xzP%ZI(K-0Y!K#WkYOmuNlZ@7$j?bhEz&EgCl2Wb!Z$>6LW?aTffNW!61mZ1?AQqBNh$XBLOVBI`@MdKLDQ5)2 KP#_J$3=9Cm5GKL^ literal 0 HcmV?d00001 diff --git a/infra/aws/lambdas/compactor/placeholder.txt b/infra/aws/lambdas/compactor/placeholder.txt new file mode 100644 index 0000000..48cdce8 --- /dev/null +++ b/infra/aws/lambdas/compactor/placeholder.txt @@ -0,0 +1 @@ +placeholder diff --git a/infra/aws/lambdas/compactor/placeholder.zip b/infra/aws/lambdas/compactor/placeholder.zip new file mode 100644 index 0000000000000000000000000000000000000000..a26ef866da7506d19df44123c54d9956c8ffc25e GIT binary patch literal 192 zcmWIWW@h1H00Fj~xzP%ZI(K-0Y!K#WkYOmuNlZ@7$j?bhEz&EgCl2Wb!Z$>6LW?aTffNW!61mZ1?AQqBNh$XBLOVBI`@MdKLDQ5)2 KP#_J$3=9Cm5GKL^ literal 0 HcmV?d00001 diff --git a/infra/aws/terraform/ec2.tf b/infra/aws/terraform/ec2.tf new file mode 100644 index 0000000..e9d2b88 --- /dev/null +++ b/infra/aws/terraform/ec2.tf @@ -0,0 +1,148 @@ +# EC2 Instance for Docker Compose deployment + +resource "aws_security_group" "sysmonstm" { + name_prefix = "${var.project_name}-" + description = "Security group for System Monitor Platform" + + # HTTP/HTTPS + ingress { + from_port = 80 + to_port = 80 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + description = "HTTP" + } + + ingress { + from_port = 443 + to_port = 443 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + description = "HTTPS" + } + + # gRPC for collectors + ingress { + from_port = 50051 + to_port = 50051 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + description = "gRPC Aggregator" + } + + # SSH (restricted) + dynamic "ingress" { + for_each = length(var.allowed_ssh_cidrs) > 0 ? [1] : [] + content { + from_port = 22 + to_port = 22 + protocol = "tcp" + cidr_blocks = var.allowed_ssh_cidrs + description = "SSH" + } + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + description = "Allow all outbound" + } + + tags = { + Name = "${var.project_name}-sg" + } +} + +resource "aws_iam_role" "ec2" { + name_prefix = "${var.project_name}-ec2-" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { + Service = "ec2.amazonaws.com" + } + } + ] + }) +} + +resource "aws_iam_role_policy_attachment" "ec2_ssm" { + role = aws_iam_role.ec2.name + policy_arn = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" +} + +resource "aws_iam_instance_profile" "ec2" { + name_prefix = "${var.project_name}-" + role = aws_iam_role.ec2.name +} + +resource "aws_instance" "sysmonstm" { + ami = data.aws_ami.amazon_linux_2023.id + instance_type = var.ec2_instance_type + key_name = var.ec2_key_name != "" ? var.ec2_key_name : null + vpc_security_group_ids = [aws_security_group.sysmonstm.id] + iam_instance_profile = aws_iam_instance_profile.ec2.name + + root_block_device { + volume_size = 20 + volume_type = "gp3" + encrypted = true + } + + user_data = <<-EOF + #!/bin/bash + set -e + + # Install Docker + dnf update -y + dnf install -y docker git + systemctl enable docker + systemctl start docker + + # Install Docker Compose + curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" \ + -o /usr/local/bin/docker-compose + chmod +x /usr/local/bin/docker-compose + + # Add ec2-user to docker group + usermod -aG docker ec2-user + + # Clone and start the application + cd /home/ec2-user + git clone https://github.com/yourusername/sysmonstm.git || true + cd sysmonstm + + # Create .env file + cat > .env < INTERVAL '1 hour', + if_not_exists => TRUE + ); + + CREATE INDEX IF NOT EXISTS idx_metrics_raw_machine + ON metrics_raw (machine_id, time DESC); + CREATE INDEX IF NOT EXISTS idx_metrics_raw_type + ON metrics_raw (metric_type, time DESC); + + CREATE TABLE IF NOT EXISTS metrics_1m ( + time TIMESTAMPTZ NOT NULL, + machine_id TEXT NOT NULL, + hostname TEXT NOT NULL, + metric_type TEXT NOT NULL, + avg_value DOUBLE PRECISION NOT NULL, + min_value DOUBLE PRECISION NOT NULL, + max_value DOUBLE PRECISION NOT NULL, + sample_count INTEGER NOT NULL + ); + + SELECT create_hypertable('metrics_1m', 'time', + chunk_time_interval => INTERVAL '1 day', + if_not_exists => TRUE + ); + + CREATE TABLE IF NOT EXISTS machines ( + machine_id TEXT PRIMARY KEY, + hostname TEXT NOT NULL, + first_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(), + metadata JSONB DEFAULT '{}'::jsonb, + health TEXT NOT NULL DEFAULT 'UNKNOWN' + ); + + CREATE TABLE IF NOT EXISTS alert_rules ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + metric_type TEXT NOT NULL, + operator TEXT NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + severity TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + + CREATE TABLE IF NOT EXISTS alerts ( + id SERIAL, + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + machine_id TEXT NOT NULL, + rule_id INTEGER REFERENCES alert_rules(id), + rule_name TEXT NOT NULL, + metric_type TEXT NOT NULL, + value DOUBLE PRECISION NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + severity TEXT NOT NULL, + resolved_at TIMESTAMPTZ, + PRIMARY KEY (id, time) + ); + + SELECT create_hypertable('alerts', 'time', + chunk_time_interval => INTERVAL '1 day', + if_not_exists => TRUE + ); + + SELECT add_retention_policy('metrics_raw', INTERVAL '24 hours', if_not_exists => TRUE); + SELECT add_retention_policy('alerts', INTERVAL '30 days', if_not_exists => TRUE); + + INSERT INTO alert_rules (name, metric_type, operator, threshold, severity) + VALUES + ('High CPU Usage', 'CPU_PERCENT', 'gt', 80.0, 'warning'), + ('Critical CPU Usage', 'CPU_PERCENT', 'gt', 95.0, 'critical'), + ('High Memory Usage', 'MEMORY_PERCENT', 'gt', 85.0, 'warning'), + ('Critical Memory Usage', 'MEMORY_PERCENT', 'gt', 95.0, 'critical') + ON CONFLICT (name) DO NOTHING; diff --git a/k8s/base/timescaledb/kustomization.yaml b/k8s/base/timescaledb/kustomization.yaml new file mode 100644 index 0000000..ae9a73d --- /dev/null +++ b/k8s/base/timescaledb/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +commonLabels: + app.kubernetes.io/name: timescaledb + app.kubernetes.io/component: database + +resources: + - statefulset.yaml + - service.yaml + - configmap.yaml diff --git a/k8s/base/timescaledb/service.yaml b/k8s/base/timescaledb/service.yaml new file mode 100644 index 0000000..4b313bb --- /dev/null +++ b/k8s/base/timescaledb/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: timescaledb +spec: + selector: + app.kubernetes.io/name: timescaledb + ports: + - port: 5432 + targetPort: postgres + name: postgres + clusterIP: None # Headless for StatefulSet diff --git a/k8s/base/timescaledb/statefulset.yaml b/k8s/base/timescaledb/statefulset.yaml new file mode 100644 index 0000000..411d5d3 --- /dev/null +++ b/k8s/base/timescaledb/statefulset.yaml @@ -0,0 +1,65 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: timescaledb +spec: + serviceName: timescaledb + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: timescaledb + template: + metadata: + labels: + app.kubernetes.io/name: timescaledb + spec: + containers: + - name: timescaledb + image: timescale/timescaledb:latest-pg15 + ports: + - containerPort: 5432 + name: postgres + env: + - name: POSTGRES_USER + value: monitor + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: timescaledb-secret + key: password + - name: POSTGRES_DB + value: monitor + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + volumeMounts: + - name: data + mountPath: /var/lib/postgresql/data + - name: init-scripts + mountPath: /docker-entrypoint-initdb.d + livenessProbe: + exec: + command: ["pg_isready", "-U", "monitor", "-d", "monitor"] + initialDelaySeconds: 30 + periodSeconds: 10 + readinessProbe: + exec: + command: ["pg_isready", "-U", "monitor", "-d", "monitor"] + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: init-scripts + configMap: + name: timescaledb-init + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + resources: + requests: + storage: 5Gi diff --git a/k8s/overlays/local/kustomization.yaml b/k8s/overlays/local/kustomization.yaml new file mode 100644 index 0000000..4f2fe45 --- /dev/null +++ b/k8s/overlays/local/kustomization.yaml @@ -0,0 +1,22 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: sysmonstm + +resources: + - ../../base + - secrets.yaml + +patches: + - path: patches/reduce-resources.yaml + +images: + - name: sysmonstm/aggregator + newName: sysmonstm-aggregator + newTag: dev + - name: sysmonstm/gateway + newName: sysmonstm-gateway + newTag: dev + - name: sysmonstm/alerts + newName: sysmonstm-alerts + newTag: dev diff --git a/k8s/overlays/local/patches/reduce-resources.yaml b/k8s/overlays/local/patches/reduce-resources.yaml new file mode 100644 index 0000000..fa70112 --- /dev/null +++ b/k8s/overlays/local/patches/reduce-resources.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: aggregator +spec: + template: + spec: + containers: + - name: aggregator + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "128Mi" + cpu: "200m" +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gateway +spec: + template: + spec: + containers: + - name: gateway + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "128Mi" + cpu: "200m" +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: timescaledb +spec: + template: + spec: + containers: + - name: timescaledb + resources: + requests: + memory: "128Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "200m" diff --git a/k8s/overlays/local/secrets.yaml b/k8s/overlays/local/secrets.yaml new file mode 100644 index 0000000..385cb5c --- /dev/null +++ b/k8s/overlays/local/secrets.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: timescaledb-secret + namespace: sysmonstm +type: Opaque +stringData: + password: "monitor" # Only for local dev! diff --git a/proto/metrics.proto b/proto/metrics.proto new file mode 100644 index 0000000..44130b9 --- /dev/null +++ b/proto/metrics.proto @@ -0,0 +1,159 @@ +syntax = "proto3"; + +package monitoring; + +option go_package = "github.com/your-org/sysmonstm/proto"; + +// MetricsService handles streaming metrics from collectors to aggregator +service MetricsService { + // Client-side streaming: collector streams metrics to aggregator + rpc StreamMetrics(stream Metric) returns (StreamAck) {} + + // Get current state of a machine + rpc GetCurrentState(StateRequest) returns (MachineState) {} + + // Get current state of all machines + rpc GetAllStates(Empty) returns (AllMachinesState) {} +} + +// ControlService handles bidirectional control commands +service ControlService { + // Bidirectional streaming for commands and responses + rpc Control(stream ControlCommand) returns (stream ControlResponse) {} +} + +// ConfigService handles dynamic configuration +service ConfigService { + // Get current configuration for a collector + rpc GetConfig(ConfigRequest) returns (CollectorConfig) {} + + // Stream configuration updates + rpc WatchConfig(ConfigRequest) returns (stream CollectorConfig) {} +} + +// Empty message for requests with no parameters +message Empty {} + +// Basic metric message +message Metric { + string machine_id = 1; + string hostname = 2; + int64 timestamp_ms = 3; + MetricType type = 4; + double value = 5; + map labels = 6; +} + +// Batch of metrics for efficient transmission +message MetricBatch { + string machine_id = 1; + string hostname = 2; + int64 timestamp_ms = 3; + repeated MetricPoint metrics = 4; +} + +message MetricPoint { + MetricType type = 1; + double value = 2; + map labels = 3; +} + +enum MetricType { + METRIC_TYPE_UNSPECIFIED = 0; + CPU_PERCENT = 1; + CPU_PERCENT_PER_CORE = 2; + MEMORY_PERCENT = 3; + MEMORY_USED_BYTES = 4; + MEMORY_AVAILABLE_BYTES = 5; + DISK_PERCENT = 6; + DISK_USED_BYTES = 7; + DISK_READ_BYTES_SEC = 8; + DISK_WRITE_BYTES_SEC = 9; + NETWORK_SENT_BYTES_SEC = 10; + NETWORK_RECV_BYTES_SEC = 11; + NETWORK_CONNECTIONS = 12; + PROCESS_COUNT = 13; + LOAD_AVG_1M = 14; + LOAD_AVG_5M = 15; + LOAD_AVG_15M = 16; +} + +// Acknowledgment for streamed metrics +message StreamAck { + bool success = 1; + int64 metrics_received = 2; + string message = 3; +} + +// Request for machine state +message StateRequest { + string machine_id = 1; +} + +// Current state of a single machine +message MachineState { + string machine_id = 1; + string hostname = 2; + int64 last_seen_ms = 3; + repeated Metric current_metrics = 4; + HealthStatus health = 5; + map metadata = 6; +} + +// State of all machines +message AllMachinesState { + repeated MachineState machines = 1; +} + +enum HealthStatus { + HEALTH_STATUS_UNSPECIFIED = 0; + HEALTHY = 1; + WARNING = 2; + CRITICAL = 3; + UNKNOWN = 4; + OFFLINE = 5; +} + +// Control commands for collectors +message ControlCommand { + string command_id = 1; + oneof command { + UpdateIntervalCommand update_interval = 2; + RestartCollectionCommand restart = 3; + ShutdownCommand shutdown = 4; + } +} + +message UpdateIntervalCommand { + int32 interval_seconds = 1; +} + +message RestartCollectionCommand {} + +message ShutdownCommand { + bool graceful = 1; +} + +message ControlResponse { + string command_id = 1; + bool success = 2; + string message = 3; +} + +// Configuration messages +message ConfigRequest { + string machine_id = 1; +} + +message CollectorConfig { + int32 collection_interval_seconds = 1; + repeated MetricType enabled_metrics = 2; + map labels = 3; + repeated ThresholdConfig thresholds = 4; +} + +message ThresholdConfig { + MetricType metric_type = 1; + double warning_threshold = 2; + double critical_threshold = 3; +} diff --git a/scripts/generate-diagrams.sh b/scripts/generate-diagrams.sh new file mode 100755 index 0000000..5d9c0df --- /dev/null +++ b/scripts/generate-diagrams.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Generate SVG diagrams from Graphviz DOT files +# Requires: graphviz (apt install graphviz) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ARCH_DIR="$SCRIPT_DIR/../docs/architecture" + +cd "$ARCH_DIR" + +echo "Generating architecture diagrams..." + +for dotfile in *.dot; do + if [ -f "$dotfile" ]; then + svgfile="${dotfile%.dot}.svg" + echo " $dotfile -> $svgfile" + dot -Tsvg "$dotfile" -o "$svgfile" + fi +done + +echo "Done! Open docs/architecture/index.html in a browser." diff --git a/scripts/init-db.sql b/scripts/init-db.sql new file mode 100644 index 0000000..5d66634 --- /dev/null +++ b/scripts/init-db.sql @@ -0,0 +1,158 @@ +-- TimescaleDB initialization script +-- Creates hypertables for time-series metrics storage + +-- Enable TimescaleDB extension +CREATE EXTENSION IF NOT EXISTS timescaledb; + +-- Raw metrics table (high resolution, short retention) +CREATE TABLE IF NOT EXISTS metrics_raw ( + time TIMESTAMPTZ NOT NULL, + machine_id TEXT NOT NULL, + hostname TEXT NOT NULL, + metric_type TEXT NOT NULL, + value DOUBLE PRECISION NOT NULL, + labels JSONB DEFAULT '{}'::jsonb +); + +-- Convert to hypertable with 1-hour chunks +SELECT create_hypertable('metrics_raw', 'time', + chunk_time_interval => INTERVAL '1 hour', + if_not_exists => TRUE +); + +-- Create indexes for common queries +CREATE INDEX IF NOT EXISTS idx_metrics_raw_machine + ON metrics_raw (machine_id, time DESC); +CREATE INDEX IF NOT EXISTS idx_metrics_raw_type + ON metrics_raw (metric_type, time DESC); + +-- Aggregated metrics table (1-minute resolution, longer retention) +CREATE TABLE IF NOT EXISTS metrics_1m ( + time TIMESTAMPTZ NOT NULL, + machine_id TEXT NOT NULL, + hostname TEXT NOT NULL, + metric_type TEXT NOT NULL, + avg_value DOUBLE PRECISION NOT NULL, + min_value DOUBLE PRECISION NOT NULL, + max_value DOUBLE PRECISION NOT NULL, + sample_count INTEGER NOT NULL +); + +SELECT create_hypertable('metrics_1m', 'time', + chunk_time_interval => INTERVAL '1 day', + if_not_exists => TRUE +); + +CREATE INDEX IF NOT EXISTS idx_metrics_1m_machine + ON metrics_1m (machine_id, time DESC); + +-- Aggregated metrics table (1-hour resolution, long retention) +CREATE TABLE IF NOT EXISTS metrics_1h ( + time TIMESTAMPTZ NOT NULL, + machine_id TEXT NOT NULL, + hostname TEXT NOT NULL, + metric_type TEXT NOT NULL, + avg_value DOUBLE PRECISION NOT NULL, + min_value DOUBLE PRECISION NOT NULL, + max_value DOUBLE PRECISION NOT NULL, + sample_count INTEGER NOT NULL +); + +SELECT create_hypertable('metrics_1h', 'time', + chunk_time_interval => INTERVAL '1 week', + if_not_exists => TRUE +); + +CREATE INDEX IF NOT EXISTS idx_metrics_1h_machine + ON metrics_1h (machine_id, time DESC); + +-- Machines registry +CREATE TABLE IF NOT EXISTS machines ( + machine_id TEXT PRIMARY KEY, + hostname TEXT NOT NULL, + first_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_seen TIMESTAMPTZ NOT NULL DEFAULT NOW(), + metadata JSONB DEFAULT '{}'::jsonb, + health TEXT NOT NULL DEFAULT 'UNKNOWN' +); + +-- Alert rules configuration +CREATE TABLE IF NOT EXISTS alert_rules ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + metric_type TEXT NOT NULL, + operator TEXT NOT NULL CHECK (operator IN ('gt', 'lt', 'gte', 'lte', 'eq')), + threshold DOUBLE PRECISION NOT NULL, + severity TEXT NOT NULL CHECK (severity IN ('warning', 'critical')), + enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Alert history +CREATE TABLE IF NOT EXISTS alerts ( + id SERIAL, + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + machine_id TEXT NOT NULL, + rule_id INTEGER REFERENCES alert_rules(id), + rule_name TEXT NOT NULL, + metric_type TEXT NOT NULL, + value DOUBLE PRECISION NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + severity TEXT NOT NULL, + resolved_at TIMESTAMPTZ, + PRIMARY KEY (id, time) +); + +SELECT create_hypertable('alerts', 'time', + chunk_time_interval => INTERVAL '1 day', + if_not_exists => TRUE +); + +-- Retention policies +-- Raw data: 24 hours +SELECT add_retention_policy('metrics_raw', INTERVAL '24 hours', if_not_exists => TRUE); + +-- 1-minute aggregates: 7 days +SELECT add_retention_policy('metrics_1m', INTERVAL '7 days', if_not_exists => TRUE); + +-- 1-hour aggregates: 90 days +SELECT add_retention_policy('metrics_1h', INTERVAL '90 days', if_not_exists => TRUE); + +-- Alerts: 30 days +SELECT add_retention_policy('alerts', INTERVAL '30 days', if_not_exists => TRUE); + +-- Continuous aggregates for automatic downsampling +CREATE MATERIALIZED VIEW IF NOT EXISTS metrics_1m_agg +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', time) AS time, + machine_id, + hostname, + metric_type, + AVG(value) AS avg_value, + MIN(value) AS min_value, + MAX(value) AS max_value, + COUNT(*) AS sample_count +FROM metrics_raw +GROUP BY time_bucket('1 minute', time), machine_id, hostname, metric_type +WITH NO DATA; + +-- Refresh policy for continuous aggregate +SELECT add_continuous_aggregate_policy('metrics_1m_agg', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 minute', + if_not_exists => TRUE +); + +-- Insert default alert rules +INSERT INTO alert_rules (name, metric_type, operator, threshold, severity) +VALUES + ('High CPU Usage', 'CPU_PERCENT', 'gt', 80.0, 'warning'), + ('Critical CPU Usage', 'CPU_PERCENT', 'gt', 95.0, 'critical'), + ('High Memory Usage', 'MEMORY_PERCENT', 'gt', 85.0, 'warning'), + ('Critical Memory Usage', 'MEMORY_PERCENT', 'gt', 95.0, 'critical'), + ('High Disk Usage', 'DISK_PERCENT', 'gt', 80.0, 'warning'), + ('Critical Disk Usage', 'DISK_PERCENT', 'gt', 90.0, 'critical') +ON CONFLICT (name) DO NOTHING; diff --git a/services/aggregator/Dockerfile b/services/aggregator/Dockerfile new file mode 100644 index 0000000..c4fb40e --- /dev/null +++ b/services/aggregator/Dockerfile @@ -0,0 +1,47 @@ +# Multi-stage Dockerfile for Aggregator service + +FROM python:3.11-slim as base + +WORKDIR /app + +# Install system dependencies including grpc_health_probe +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && curl -fsSL https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.24/grpc_health_probe-linux-amd64 \ + -o /bin/grpc_health_probe \ + && chmod +x /bin/grpc_health_probe \ + && rm -rf /var/lib/apt/lists/* + +COPY services/aggregator/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY shared /app/shared +COPY proto /app/proto + +RUN python -m grpc_tools.protoc \ + -I/app/proto \ + --python_out=/app/shared \ + --grpc_python_out=/app/shared \ + /app/proto/metrics.proto + +COPY services/aggregator /app/services/aggregator + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +# ============================================================================= +FROM base as development + +RUN pip install --no-cache-dir watchfiles + +CMD ["python", "-m", "watchfiles", "python services/aggregator/main.py", "/app/services/aggregator"] + +# ============================================================================= +FROM base as production + +RUN useradd --create-home --shell /bin/bash appuser +USER appuser + +EXPOSE 50051 + +CMD ["python", "services/aggregator/main.py"] diff --git a/services/aggregator/requirements.txt b/services/aggregator/requirements.txt new file mode 100644 index 0000000..eea914a --- /dev/null +++ b/services/aggregator/requirements.txt @@ -0,0 +1,9 @@ +grpcio>=1.60.0 +grpcio-tools>=1.60.0 +grpcio-health-checking>=1.60.0 +redis>=5.0.0 +asyncpg>=0.29.0 +structlog>=23.2.0 +python-json-logger>=2.0.7 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 diff --git a/services/alerts/Dockerfile b/services/alerts/Dockerfile new file mode 100644 index 0000000..d1300a9 --- /dev/null +++ b/services/alerts/Dockerfile @@ -0,0 +1,35 @@ +# Multi-stage Dockerfile for Alerts service + +FROM python:3.11-slim as base + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY services/alerts/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY shared /app/shared +COPY proto /app/proto + +COPY services/alerts /app/services/alerts + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +# ============================================================================= +FROM base as development + +RUN pip install --no-cache-dir watchfiles + +CMD ["python", "-m", "watchfiles", "python services/alerts/main.py", "/app/services/alerts"] + +# ============================================================================= +FROM base as production + +RUN useradd --create-home --shell /bin/bash appuser +USER appuser + +CMD ["python", "services/alerts/main.py"] diff --git a/services/alerts/requirements.txt b/services/alerts/requirements.txt new file mode 100644 index 0000000..dc6d7d7 --- /dev/null +++ b/services/alerts/requirements.txt @@ -0,0 +1,6 @@ +redis>=5.0.0 +asyncpg>=0.29.0 +structlog>=23.2.0 +python-json-logger>=2.0.7 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 diff --git a/services/collector/Dockerfile b/services/collector/Dockerfile new file mode 100644 index 0000000..b514f26 --- /dev/null +++ b/services/collector/Dockerfile @@ -0,0 +1,55 @@ +# Multi-stage Dockerfile for Collector service +# Stages: base -> development, base -> production + +# ============================================================================= +# Base stage - common dependencies +# ============================================================================= +FROM python:3.11-slim as base + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY services/collector/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Copy shared code and proto +COPY shared /app/shared +COPY proto /app/proto + +# Generate gRPC code from proto +RUN python -m grpc_tools.protoc \ + -I/app/proto \ + --python_out=/app/shared \ + --grpc_python_out=/app/shared \ + /app/proto/metrics.proto + +# Copy service code +COPY services/collector /app/services/collector + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +# ============================================================================= +# Development stage - with hot reload +# ============================================================================= +FROM base as development + +RUN pip install --no-cache-dir watchfiles + +CMD ["python", "-m", "watchfiles", "python services/collector/main.py", "/app/services/collector"] + +# ============================================================================= +# Production stage - optimized +# ============================================================================= +FROM base as production + +# Run as non-root user +RUN useradd --create-home --shell /bin/bash appuser +USER appuser + +CMD ["python", "services/collector/main.py"] diff --git a/services/collector/requirements.txt b/services/collector/requirements.txt new file mode 100644 index 0000000..9a806f4 --- /dev/null +++ b/services/collector/requirements.txt @@ -0,0 +1,7 @@ +grpcio>=1.60.0 +grpcio-tools>=1.60.0 +psutil>=5.9.0 +structlog>=23.2.0 +python-json-logger>=2.0.7 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 diff --git a/services/gateway/Dockerfile b/services/gateway/Dockerfile new file mode 100644 index 0000000..ad54609 --- /dev/null +++ b/services/gateway/Dockerfile @@ -0,0 +1,44 @@ +# Multi-stage Dockerfile for Gateway service (FastAPI) + +FROM python:3.11-slim as base + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY services/gateway/requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY shared /app/shared +COPY proto /app/proto + +RUN python -m grpc_tools.protoc \ + -I/app/proto \ + --python_out=/app/shared \ + --grpc_python_out=/app/shared \ + /app/proto/metrics.proto + +COPY services/gateway /app/services/gateway +COPY web /app/web + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +# ============================================================================= +FROM base as development + +RUN pip install --no-cache-dir watchfiles + +CMD ["uvicorn", "services.gateway.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] + +# ============================================================================= +FROM base as production + +RUN useradd --create-home --shell /bin/bash appuser +USER appuser + +EXPOSE 8000 + +CMD ["uvicorn", "services.gateway.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"] diff --git a/services/gateway/requirements.txt b/services/gateway/requirements.txt new file mode 100644 index 0000000..ead958c --- /dev/null +++ b/services/gateway/requirements.txt @@ -0,0 +1,13 @@ +fastapi>=0.109.0 +uvicorn[standard]>=0.27.0 +grpcio>=1.60.0 +grpcio-tools>=1.60.0 +redis>=5.0.0 +asyncpg>=0.29.0 +websockets>=12.0 +jinja2>=3.1.2 +structlog>=23.2.0 +python-json-logger>=2.0.7 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 +httpx>=0.26.0 diff --git a/shared/events/__init__.py b/shared/events/__init__.py new file mode 100644 index 0000000..ccf1c20 --- /dev/null +++ b/shared/events/__init__.py @@ -0,0 +1,34 @@ +""" +Event publishing/subscribing abstraction layer. + +Supports: +- Redis Pub/Sub (default, simple) +- Redis Streams (with consumer groups, persistence) +- Kafka (future, for high-throughput) + +Usage: + from shared.events import get_publisher, get_subscriber + + # Publishing + async with get_publisher() as pub: + await pub.publish("metrics.raw", {"machine_id": "m1", ...}) + + # Subscribing + async with get_subscriber(["metrics.raw", "alerts.*"]) as sub: + async for topic, message in sub.consume(): + process(topic, message) +""" + +from .base import EventPublisher, EventSubscriber, Event +from .redis_pubsub import RedisPubSubPublisher, RedisPubSubSubscriber +from .factory import get_publisher, get_subscriber + +__all__ = [ + "EventPublisher", + "EventSubscriber", + "Event", + "RedisPubSubPublisher", + "RedisPubSubSubscriber", + "get_publisher", + "get_subscriber", +] diff --git a/shared/events/base.py b/shared/events/base.py new file mode 100644 index 0000000..edbbcb8 --- /dev/null +++ b/shared/events/base.py @@ -0,0 +1,117 @@ +"""Abstract base classes for event publishing and subscribing.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, AsyncIterator +import uuid + + +@dataclass +class Event: + """Standard event envelope.""" + topic: str + payload: dict[str, Any] + event_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: datetime = field(default_factory=datetime.utcnow) + source: str = "" + + def to_dict(self) -> dict[str, Any]: + return { + "event_id": self.event_id, + "topic": self.topic, + "timestamp": self.timestamp.isoformat(), + "source": self.source, + "payload": self.payload, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "Event": + return cls( + event_id=data.get("event_id", str(uuid.uuid4())), + topic=data["topic"], + timestamp=datetime.fromisoformat(data["timestamp"]) if "timestamp" in data else datetime.utcnow(), + source=data.get("source", ""), + payload=data.get("payload", {}), + ) + + +class EventPublisher(ABC): + """Abstract base for event publishers.""" + + @abstractmethod + async def connect(self) -> None: + """Establish connection to the message broker.""" + pass + + @abstractmethod + async def disconnect(self) -> None: + """Close connection to the message broker.""" + pass + + @abstractmethod + async def publish(self, topic: str, payload: dict[str, Any], **kwargs) -> str: + """ + Publish an event to a topic. + + Args: + topic: The topic/channel to publish to + payload: The event data + **kwargs: Additional options (e.g., headers, partition key) + + Returns: + The event ID + """ + pass + + async def publish_event(self, event: Event) -> str: + """Publish a pre-constructed Event object.""" + return await self.publish(event.topic, event.payload, event_id=event.event_id) + + async def __aenter__(self) -> "EventPublisher": + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.disconnect() + + +class EventSubscriber(ABC): + """Abstract base for event subscribers.""" + + @abstractmethod + async def connect(self) -> None: + """Establish connection to the message broker.""" + pass + + @abstractmethod + async def disconnect(self) -> None: + """Close connection and unsubscribe.""" + pass + + @abstractmethod + async def subscribe(self, topics: list[str]) -> None: + """ + Subscribe to one or more topics. + + Args: + topics: List of topics/patterns to subscribe to + """ + pass + + @abstractmethod + async def consume(self) -> AsyncIterator[Event]: + """ + Async generator that yields events as they arrive. + + Yields: + Event objects + """ + pass + + async def __aenter__(self) -> "EventSubscriber": + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.disconnect() diff --git a/shared/events/factory.py b/shared/events/factory.py new file mode 100644 index 0000000..b7d94c3 --- /dev/null +++ b/shared/events/factory.py @@ -0,0 +1,101 @@ +"""Factory functions for creating event publishers and subscribers.""" + +import os +from enum import Enum + +from .base import EventPublisher, EventSubscriber +from .redis_pubsub import RedisPubSubPublisher, RedisPubSubSubscriber + + +class EventBackend(str, Enum): + """Supported event backends.""" + + REDIS_PUBSUB = "redis_pubsub" + REDIS_STREAMS = "redis_streams" # Future + KAFKA = "kafka" # Future + + +def get_publisher( + backend: EventBackend | str | None = None, + source: str = "", + **kwargs, +) -> EventPublisher: + """ + Factory function to get an event publisher. + + Args: + backend: The event backend to use (default: from EVENTS_BACKEND env var or redis_pubsub) + source: Identifier for the source service + **kwargs: Backend-specific options + + Returns: + An EventPublisher instance + + Environment variables: + EVENTS_BACKEND: Default backend (redis_pubsub, redis_streams, kafka) + REDIS_URL: Redis connection URL + KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (future) + """ + if backend is None: + backend = os.getenv("EVENTS_BACKEND", EventBackend.REDIS_PUBSUB) + + if isinstance(backend, str): + backend = EventBackend(backend) + + if backend == EventBackend.REDIS_PUBSUB: + redis_url = kwargs.get("redis_url") or os.getenv( + "REDIS_URL", "redis://localhost:6379" + ) + return RedisPubSubPublisher(redis_url=redis_url, source=source) + + elif backend == EventBackend.REDIS_STREAMS: + raise NotImplementedError("Redis Streams backend not yet implemented") + + elif backend == EventBackend.KAFKA: + raise NotImplementedError("Kafka backend not yet implemented") + + else: + raise ValueError(f"Unknown event backend: {backend}") + + +def get_subscriber( + topics: list[str] | None = None, + backend: EventBackend | str | None = None, + **kwargs, +) -> EventSubscriber: + """ + Factory function to get an event subscriber. + + Args: + topics: Topics to subscribe to + backend: The event backend to use (default: from EVENTS_BACKEND env var or redis_pubsub) + **kwargs: Backend-specific options + + Returns: + An EventSubscriber instance + + Environment variables: + EVENTS_BACKEND: Default backend (redis_pubsub, redis_streams, kafka) + REDIS_URL: Redis connection URL + KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (future) + """ + if backend is None: + backend = os.getenv("EVENTS_BACKEND", EventBackend.REDIS_PUBSUB) + + if isinstance(backend, str): + backend = EventBackend(backend) + + if backend == EventBackend.REDIS_PUBSUB: + redis_url = kwargs.get("redis_url") or os.getenv( + "REDIS_URL", "redis://localhost:6379" + ) + return RedisPubSubSubscriber(redis_url=redis_url, topics=topics) + + elif backend == EventBackend.REDIS_STREAMS: + raise NotImplementedError("Redis Streams backend not yet implemented") + + elif backend == EventBackend.KAFKA: + raise NotImplementedError("Kafka backend not yet implemented") + + else: + raise ValueError(f"Unknown event backend: {backend}") diff --git a/shared/events/redis_pubsub.py b/shared/events/redis_pubsub.py new file mode 100644 index 0000000..0bffe81 --- /dev/null +++ b/shared/events/redis_pubsub.py @@ -0,0 +1,142 @@ +"""Redis Pub/Sub implementation of event publishing/subscribing.""" + +import asyncio +import json +import logging +from typing import Any, AsyncIterator + +import redis.asyncio as redis + +from .base import Event, EventPublisher, EventSubscriber + +logger = logging.getLogger(__name__) + + +class RedisPubSubPublisher(EventPublisher): + """Redis Pub/Sub based event publisher.""" + + def __init__( + self, + redis_url: str = "redis://localhost:6379", + source: str = "", + ): + self.redis_url = redis_url + self.source = source + self._client: redis.Redis | None = None + + async def connect(self) -> None: + self._client = redis.from_url(self.redis_url, decode_responses=True) + await self._client.ping() + logger.info(f"Connected to Redis at {self.redis_url}") + + async def disconnect(self) -> None: + if self._client: + await self._client.close() + self._client = None + logger.info("Disconnected from Redis") + + async def publish(self, topic: str, payload: dict[str, Any], **kwargs) -> str: + if not self._client: + raise RuntimeError("Publisher not connected") + + event = Event( + topic=topic, + payload=payload, + event_id=kwargs.get("event_id", None) + or Event(topic="", payload={}).event_id, + source=self.source, + ) + + message = json.dumps(event.to_dict()) + await self._client.publish(topic, message) + + logger.debug(f"Published event {event.event_id} to {topic}") + return event.event_id + + +class RedisPubSubSubscriber(EventSubscriber): + """Redis Pub/Sub based event subscriber.""" + + def __init__( + self, + redis_url: str = "redis://localhost:6379", + topics: list[str] | None = None, + ): + self.redis_url = redis_url + self._topics = topics or [] + self._client: redis.Redis | None = None + self._pubsub: redis.client.PubSub | None = None + self._running = False + + async def connect(self) -> None: + self._client = redis.from_url(self.redis_url, decode_responses=True) + await self._client.ping() + self._pubsub = self._client.pubsub() + logger.info(f"Connected to Redis at {self.redis_url}") + + if self._topics: + await self.subscribe(self._topics) + + async def disconnect(self) -> None: + self._running = False + if self._pubsub: + await self._pubsub.unsubscribe() + await self._pubsub.close() + self._pubsub = None + if self._client: + await self._client.close() + self._client = None + logger.info("Disconnected from Redis") + + async def subscribe(self, topics: list[str]) -> None: + if not self._pubsub: + raise RuntimeError("Subscriber not connected") + + # Separate pattern subscriptions from regular ones + patterns = [t for t in topics if "*" in t] + channels = [t for t in topics if "*" not in t] + + if channels: + await self._pubsub.subscribe(*channels) + logger.info(f"Subscribed to channels: {channels}") + + if patterns: + await self._pubsub.psubscribe(*patterns) + logger.info(f"Subscribed to patterns: {patterns}") + + self._topics.extend(topics) + + async def consume(self) -> AsyncIterator[Event]: + if not self._pubsub: + raise RuntimeError("Subscriber not connected") + + self._running = True + + while self._running: + try: + message = await self._pubsub.get_message( + ignore_subscribe_messages=True, + timeout=1.0, + ) + + if message is None: + await asyncio.sleep(0.01) + continue + + if message["type"] not in ("message", "pmessage"): + continue + + try: + data = json.loads(message["data"]) + event = Event.from_dict(data) + yield event + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"Failed to parse event: {e}") + continue + + except asyncio.CancelledError: + self._running = False + break + except Exception as e: + logger.error(f"Error consuming events: {e}") + await asyncio.sleep(1.0)