Compare commits

..

4 Commits

Author SHA1 Message Date
buenosairesam
0cd8d1516f sysmonstm setup
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2026-01-22 18:30:30 -03:00
buenosairesam
82c4551e71 simple is better 2026-01-22 16:22:15 -03:00
buenosairesam
dc3518f138 new three layer deployment 2026-01-22 12:55:50 -03:00
buenosairesam
174bc15368 add readme 2026-01-22 06:02:01 -03:00
14 changed files with 691 additions and 669 deletions

533
CLAUDE.md
View File

@@ -2,491 +2,90 @@
## Project Overview
A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture) while solving a real problem: monitoring development infrastructure across multiple machines.
A real-time system monitoring platform that streams metrics from multiple machines to a central hub with live web dashboard. Built to demonstrate production microservices patterns (gRPC, FastAPI, streaming, event-driven architecture).
**Primary Goal:** Interview demonstration project for Python Microservices Engineer position
**Secondary Goal:** Actually useful tool for managing multi-machine development environment
**Time Investment:** Phased approach - MVP in weekend, polish over 2-3 weeks
## Why This Project
**Interview Alignment:**
- Demonstrates gRPC-based microservices architecture (core requirement)
- Shows streaming patterns (server-side and bidirectional)
- Real-time data aggregation and processing
- Alert/threshold monitoring (maps to fraud detection)
- Event-driven patterns
- Multiple data sources requiring normalization (maps to multiple payment processors)
**Personal Utility:**
- Monitors existing multi-machine dev setup
- Dashboard stays open, provides real value
- Solves actual pain point
- Will continue running post-interview
**Domain Mapping for Interview:**
- Machine = Payment Processor
- Metrics Stream = Transaction Stream
- Resource Thresholds = Fraud/Limit Detection
- Alert System = Risk Management
- Aggregation Service = Payment Processing Hub
## Technical Stack
### Core Technologies (Must Use - From JD)
- **Python 3.11+** - Primary language
- **FastAPI** - Web gateway, REST endpoints, WebSocket streaming
- **gRPC** - Inter-service communication, metric streaming
- **PostgreSQL/TimescaleDB** - Time-series historical data
- **Redis** - Current state, caching, alert rules
- **Docker Compose** - Orchestration
### Supporting Technologies
- **Protocol Buffers** - gRPC message definitions
- **WebSockets** - Browser streaming
- **htmx + Alpine.js** - Lightweight reactive frontend (avoid heavy SPA)
- **Chart.js or Apache ECharts** - Real-time graphs
- **asyncio** - Async patterns throughout
### Development Tools
- **grpcio & grpcio-tools** - Python gRPC
- **psutil** - System metrics collection
- **uvicorn** - FastAPI server
- **pytest** - Testing
- **docker-compose** - Local orchestration
**Primary Goal:** Portfolio project demonstrating real-time streaming with gRPC
**Status:** Working, deployed at sysmonstm.mcrn.ar
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
Browser │
┌──────────────────────────────────────────────────────┐ │
Dashboard (htmx + Alpine.js + WebSockets) │ │
│ └──────────────────────────────────────────────────────┘
└────────────────────────┬────────────────────────────────────┘
│ WebSocket
┌─────────────────────────────────────────────────────────────┐
│ Web Gateway Service │
│ (FastAPI + WebSockets) │
│ - Serves dashboard │
│ - Streams updates to browser │
│ - REST API for historical queries
└────────────────────────┬────────────────────────────────────┘
│ gRPC
┌─────────────────────────────────────────────────────────────┐
│ Aggregator Service (gRPC) │
│ - Receives metric streams from all collectors │
- Normalizes data from different sources │
- Enriches with machine context │
- Publishes to event stream │
- Checks alert thresholds │
└─────┬───────────────────────────────────┬───────────────────┘
│ │
│ Stores │ Publishes events
▼ ▼
┌──────────────┐ ┌────────────────┐
│ TimescaleDB │ │ Event Stream │
│ (historical)│ │ (Redis Pub/Sub│
└──────────────┘ │ or RabbitMQ) │
└────────┬───────┘
┌──────────────┐ │
│ Redis │ │ Subscribes
│ (current │◄───────────────────────────┘
│ state) │ │
└──────────────┘ ▼
┌────────────────┐
▲ │ Alert Service │
│ │ - Processes │
│ │ events │
│ gRPC Streaming │ - Triggers │
│ │ actions │
┌─────┴────────────────────────────┴────────────────┘
┌─────────────┐ ┌─────────────────────────────────────┐ ┌─────────────┐
Collector │────▶│ Aggregator + Gateway + Redis + TS │────▶│ Edge │────▶ Browser
(mcrn) │gRPC │ (LOCAL) │ WS │ (AWS) │ WS
└─────────────┘ └─────────────────────────────────────┘ └─────────────┘
┌─────────────┐
│ Collector │────────────────────┘
│ (nfrt) │gRPC
└─────────────┘
```
- **Collectors** (`services/collector/`) - gRPC clients on each monitored machine
- **Aggregator** (`services/aggregator/`) - gRPC server, stores in Redis/TimescaleDB
- **Gateway** (`services/gateway/`) - FastAPI, bridges gRPC to WebSocket, forwards to edge
- **Edge** (`ctrl/edge/`) - Simple WebSocket relay for AWS, serves public dashboard
## Directory Structure
```
sms/
├── services/ # gRPC-based microservices
├── collector/ # gRPC client, streams to aggregator
├── aggregator/ # gRPC server, stores in Redis/TimescaleDB
├── gateway/ # FastAPI, WebSocket, forwards to edge
└── alerts/ # Event subscriber for threshold alerts
│ Multiple Collector Services (one per machine)
┌───────────────────────────────────────┐
│ Metrics Collector (gRPC Client) │
│ │ - Gathers system metrics (psutil) │
│ │ - Streams to Aggregator via gRPC │
│ │ - CPU, Memory, Disk, Network │
│ │ - Process list │
│ │ - Docker container stats (optional) │
│ └───────────────────────────────────────┘
├── ctrl/ # Deployment configurations
├── dev/ # Full stack docker-compose
└── edge/ # Cloud dashboard (AWS)
──► Machine 1, Machine 2, Machine 3, ...
── proto/ # Protocol Buffer definitions
├── shared/ # Shared Python modules (config, logging, events)
└── web/ # Dashboard templates and static files
```
## Implementation Phases
## Running
### Phase 1: MVP - Core Streaming (Weekend - 8-12 hours)
**Goal:** Prove the gRPC streaming works end-to-end
**Deliverables:**
1. Metrics Collector Service (gRPC client)
- Collects CPU, memory, disk on localhost
- Streams to aggregator every 5 seconds
2. Aggregator Service (gRPC server)
- Receives metric stream
- Stores current state in Redis
- Logs to console
3. Proto definitions for metric messages
4. Docker Compose setup
**Success Criteria:**
- Run collector, see metrics flowing to aggregator
- Redis contains current state
- Can query Redis manually for latest metrics
### Phase 2: Web Dashboard (1 week)
**Goal:** Make it visible and useful
**Deliverables:**
1. Web Gateway Service (FastAPI)
- WebSocket endpoint for streaming
- REST endpoints for current/historical data
2. Dashboard UI
- Real-time CPU/Memory graphs per machine
- Current state table
- Simple, clean design
3. WebSocket bridge (Gateway ↔ Aggregator)
4. TimescaleDB integration
- Store historical metrics
- Query endpoints for time ranges
**Success Criteria:**
- Open dashboard, see live graphs updating
- Graphs show last hour of data
- Multiple machines displayed separately
### Phase 3: Alerts & Intelligence (1 week)
**Goal:** Add decision-making layer (interview focus)
**Deliverables:**
1. Alert Service
- Subscribes to event stream
- Evaluates threshold rules
- Triggers notifications
2. Configuration Service (gRPC)
- Dynamic threshold management
- Alert rule CRUD
- Stored in PostgreSQL
3. Event Stream implementation (Redis Pub/Sub or RabbitMQ)
4. Enhanced dashboard
- Alert indicators
- Alert history
- Threshold configuration UI
**Success Criteria:**
- Set CPU threshold at 80%
- Generate load (stress-ng)
- See alert trigger in dashboard
- Alert logged to database
### Phase 4: Interview Polish (Final week)
**Goal:** Demo-ready, production patterns visible
**Deliverables:**
1. Observability
- OpenTelemetry tracing (optional)
- Structured logging
- Health check endpoints
2. "Synthetic Transactions"
- Simulate business operations through system
- Track end-to-end latency
- Maps directly to payment processing demo
3. Documentation
- Architecture diagram
- Service interaction flows
- Deployment guide
4. Demo script
- Story to walk through
- Key talking points
- Domain mapping explanations
**Success Criteria:**
- Can deploy entire stack with one command
- Can explain every service's role
- Can map architecture to payment processing
- Demo runs smoothly without hiccups
## Key Technical Patterns to Demonstrate
### 1. gRPC Streaming Patterns
**Server-Side Streaming:**
```python
# Collector streams metrics to aggregator
service MetricsService {
rpc StreamMetrics(MetricsRequest) returns (stream Metric) {}
}
### Local Development
```bash
docker compose up
```
**Bidirectional Streaming:**
```python
# Two-way communication between services
service ControlService {
rpc ManageStream(stream Command) returns (stream Response) {}
}
### With Edge Forwarding (to AWS)
```bash
EDGE_URL=wss://sysmonstm.mcrn.ar/ws docker compose up
```
### 2. Service Communication Patterns
- **Synchronous (gRPC):** Query current state, configuration
- **Asynchronous (Events):** Metric updates, alerts, audit logs
- **Streaming (gRPC + WebSocket):** Real-time data flow
### 3. Data Storage Patterns
- **Hot data (Redis):** Current state, recent metrics (last 5 minutes)
- **Warm data (TimescaleDB):** Historical metrics (last 30 days)
- **Cold data (Optional):** Archive to S3-compatible storage
### 4. Error Handling & Resilience
- gRPC retry logic with exponential backoff
- Circuit breaker pattern for service calls
- Graceful degradation (continue if one collector fails)
- Dead letter queue for failed events
## Proto Definitions (Starting Point)
```protobuf
syntax = "proto3";
package monitoring;
service MetricsService {
rpc StreamMetrics(MetricsRequest) returns (stream Metric) {}
rpc GetCurrentState(StateRequest) returns (MachineState) {}
}
message MetricsRequest {
string machine_id = 1;
int32 interval_seconds = 2;
}
message Metric {
string machine_id = 1;
int64 timestamp = 2;
MetricType type = 3;
double value = 4;
map<string, string> labels = 5;
}
enum MetricType {
CPU_PERCENT = 0;
MEMORY_PERCENT = 1;
MEMORY_USED_GB = 2;
DISK_PERCENT = 3;
NETWORK_SENT_MBPS = 4;
NETWORK_RECV_MBPS = 5;
}
message MachineState {
string machine_id = 1;
int64 last_seen = 2;
repeated Metric current_metrics = 3;
HealthStatus health = 4;
}
enum HealthStatus {
HEALTHY = 0;
WARNING = 1;
CRITICAL = 2;
UNKNOWN = 3;
}
### Collector on Remote Machine
```bash
docker run -d --network host \
-e AGGREGATOR_URL=<local-ip>:50051 \
-e MACHINE_ID=$(hostname) \
registry.mcrn.ar/sysmonstm/collector:latest
```
## Project Structure
## Technical Stack
```
system-monitor/
├── docker-compose.yml
├── proto/
│ └── metrics.proto
├── services/
│ ├── collector/
│ │ ├── Dockerfile
│ │ ├── requirements.txt
│ │ ├── main.py
│ │ └── metrics.py
│ ├── aggregator/
│ │ ├── Dockerfile
│ │ ├── requirements.txt
│ │ ├── main.py
│ │ └── storage.py
│ ├── gateway/
│ │ ├── Dockerfile
│ │ ├── requirements.txt
│ │ ├── main.py
│ │ └── websocket.py
│ └── alerts/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── main.py
│ └── rules.py
├── web/
│ ├── static/
│ │ ├── css/
│ │ └── js/
│ └── templates/
│ └── dashboard.html
└── README.md
```
- **Python 3.11+**
- **gRPC** - Collector to aggregator communication (showcased)
- **FastAPI** - Gateway REST/WebSocket
- **Redis** - Pub/Sub events, current state cache
- **TimescaleDB** - Historical metrics storage
- **WebSocket** - Gateway to edge, edge to browser
## Interview Talking Points
## Key Files
### Domain Mapping to Payments
| File | Purpose |
|------|---------|
| `proto/metrics.proto` | gRPC service and message definitions |
| `services/collector/main.py` | gRPC streaming client |
| `services/aggregator/main.py` | gRPC server, metric processing |
| `services/gateway/main.py` | WebSocket bridge, edge forwarding |
| `ctrl/edge/edge.py` | Simple WebSocket relay for AWS |
**What you say:**
- "I built this to monitor my dev machines, but the architecture directly maps to payment processing"
- "Each machine streaming metrics is like a payment processor streaming transactions"
- "The aggregator normalizes data from different sources - same as aggregating from Stripe, PayPal, bank APIs"
- "Alert thresholds on resource usage are structurally identical to fraud detection thresholds"
- "The event stream for audit trails maps directly to payment audit logs"
## Portfolio Talking Points
### Technical Decisions to Highlight
**gRPC vs REST:**
- "I use gRPC between services for efficiency and strong typing"
- "FastAPI gateway exposes REST/WebSocket for browser clients"
- "This pattern is common - internal gRPC, external REST"
**Streaming vs Polling:**
- "Server-side streaming reduces network overhead"
- "Bidirectional streaming allows dynamic configuration updates"
- "WebSocket to browser maintains single connection"
**State Management:**
- "Redis for hot data - current state, needs fast access"
- "TimescaleDB for historical analysis - optimized for time-series"
- "This tiered storage approach scales to payment transaction volumes"
**Resilience:**
- "Each collector is independent - one failing doesn't affect others"
- "Circuit breaker prevents cascade failures"
- "Event stream decouples alert processing from metric ingestion"
### What NOT to Say
- Don't call it a "toy project" or "learning exercise"
- Don't apologize for running locally vs AWS
- Don't over-explain obvious things
- Don't claim it's production-ready when it's not
### What TO Say
- "I built this to solve a real problem I have"
- "Locally it uses PostgreSQL/Redis, in production these become Aurora/ElastiCache"
- "I focused on the architectural patterns since those transfer directly"
- "I'd keep developing this - it's genuinely useful"
## Development Guidelines
### Code Quality Standards
- Type hints throughout (Python 3.11+ syntax)
- Async/await patterns consistently
- Structured logging (JSON format)
- Error handling at all boundaries
- Unit tests for business logic
- Integration tests for service interactions
### Docker Best Practices
- Multi-stage builds
- Non-root users
- Health checks
- Resource limits
- Volume mounts for development
### Configuration Management
- Environment variables for all config
- Sensible defaults
- Config validation on startup
- No secrets in code
## AWS Mapping (For Interview Discussion)
**What you have → What it becomes:**
- PostgreSQL → Aurora PostgreSQL
- Redis → ElastiCache
- Docker Containers → ECS/Fargate or Lambda
- RabbitMQ/Redis Pub/Sub → SQS/SNS
- Docker Compose → CloudFormation/Terraform
- Local networking → VPC, Security Groups
**Key point:** "The architecture and patterns are production-ready, the infrastructure is local for development convenience"
## Common Pitfalls to Avoid
1. **Over-engineering Phase 1** - Resist adding features, just get streaming working
2. **Ugly UI** - Don't waste time on design, htmx + basic CSS is fine
3. **Perfect metrics** - Mock data is OK early on, real psutil data comes later
4. **Complete coverage** - Better to have 3 services working perfectly than 10 half-done
5. **AWS deployment** - Local is fine, AWS costs money and adds complexity
## Success Metrics
**For Yourself:**
- [ ] Actually use the dashboard daily
- [ ] Catches a real issue before you notice
- [ ] Runs stable for 1+ week without intervention
**For Interview:**
- [ ] Can demo end-to-end in 5 minutes
- [ ] Can explain every service interaction
- [ ] Can map to payment domain fluently
- [ ] Shows understanding of production patterns
## Next Steps
1. Set up project structure
2. Define proto messages
3. Build Phase 1 MVP
4. Iterate based on what feels useful
5. Polish for demo when interview approaches
## Resources
- gRPC Python docs: https://grpc.io/docs/languages/python/
- FastAPI WebSockets: https://fastapi.tiangolo.com/advanced/websockets/
- TimescaleDB: https://docs.timescale.com/
- htmx: https://htmx.org/
## Questions to Ask Yourself During Development
- "Would I actually use this feature?"
- "How does this map to payments?"
- "Can I explain why I built it this way?"
- "What would break if X service failed?"
- "How would this scale to 1000 machines?"
---
## Final Note
This project works because it's:
1. **Real** - You'll use it
2. **Focused** - Shows specific patterns they care about
3. **Mappable** - Clear connection to their domain
4. **Yours** - Not a tutorial copy, demonstrates your thinking
Build it in phases, use it daily, and by interview time you'll have natural stories about trade-offs, failures, and learnings. That authenticity is more valuable than perfect code.
Good luck! 🚀
- **gRPC streaming** - Efficient binary protocol for real-time metrics
- **Event-driven** - Redis Pub/Sub decouples processing from delivery
- **Edge pattern** - Heavy processing local, lightweight relay in cloud
- **Cost optimization** - ~$10/mo for public dashboard (data transfer, not requests)

214
README.md Normal file
View File

@@ -0,0 +1,214 @@
# sysmonstm
A real-time distributed system monitoring platform that streams metrics from multiple machines to a central hub with a live web dashboard.
## Overview
sysmonstm demonstrates production microservices patterns (gRPC streaming, FastAPI, event-driven architecture) while solving a real problem: monitoring development infrastructure across multiple machines.
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Collector │ │ Collector │ │ Collector │
│ (Machine 1) │ │ (Machine 2) │ │ (Machine N) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
│ gRPC Streaming │
└───────────────────────┼───────────────────────┘
┌────────────────────────┐
│ Aggregator │
│ (gRPC Server + Redis │
│ + TimescaleDB) │
└────────────┬───────────┘
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌──────────────┐ ┌──────────────┐
│ Gateway │ │ Alerts │ │ Event Stream│
│ (FastAPI + WS) │ │ Service │ │ (Redis PubSub│
└────────┬───────┘ └──────────────┘ └──────────────┘
│ WebSocket
┌────────────────┐
│ Browser │
│ Dashboard │
└────────────────┘
```
## Features
- **Real-time streaming**: Collectors stream metrics via gRPC to central aggregator
- **Multi-machine support**: Monitor any number of machines from a single dashboard
- **Live dashboard**: WebSocket-powered updates with real-time graphs
- **Tiered storage**: Redis for hot data, TimescaleDB for historical analysis
- **Threshold alerts**: Configurable rules for CPU, memory, disk usage
- **Event-driven**: Decoupled services via Redis Pub/Sub
## Quick Start
```bash
# Start the full stack
docker compose up
# Open dashboard
open http://localhost:8000
```
Metrics appear within seconds. The collector runs locally by default.
### Monitor Additional Machines
Run the collector on any machine you want to monitor:
```bash
# On a remote machine, point to your aggregator
COLLECTOR_AGGREGATOR_URL=your-server:50051 \
COLLECTOR_MACHINE_ID=my-laptop \
python services/collector/main.py
```
## Architecture
### Services
| Service | Port | Description |
|---------|------|-------------|
| **Collector** | - | gRPC client that streams system metrics (CPU, memory, disk, network) |
| **Aggregator** | 50051 | gRPC server that receives metrics, stores them, publishes events |
| **Gateway** | 8000 | FastAPI server with REST API and WebSocket for dashboard |
| **Alerts** | - | Subscribes to events, evaluates threshold rules, triggers notifications |
### Infrastructure
| Component | Purpose |
|-----------|---------|
| **Redis** | Current state cache, event pub/sub |
| **TimescaleDB** | Historical metrics with automatic downsampling |
### Key Patterns
- **gRPC Streaming**: Collectors stream metrics continuously to the aggregator
- **Event-Driven**: Services communicate via Redis Pub/Sub for decoupling
- **Tiered Storage**: Hot data in Redis, historical in TimescaleDB
- **Graceful Degradation**: System continues partially if storage fails
## Project Structure
```
sysmonstm/
├── proto/
│ └── metrics.proto # gRPC service definitions
├── services/
│ ├── collector/ # Metrics collection (psutil)
│ ├── aggregator/ # Central gRPC server
│ ├── gateway/ # FastAPI + WebSocket
│ └── alerts/ # Threshold evaluation
├── shared/
│ ├── config.py # Pydantic settings
│ ├── logging.py # Structured JSON logging
│ └── events/ # Event pub/sub abstraction
├── web/
│ ├── static/ # CSS, JS
│ └── templates/ # Dashboard HTML
├── scripts/
│ └── init-db.sql # TimescaleDB schema
├── docs/ # Architecture diagrams & explainers
├── docker-compose.yml
└── Tiltfile # Local Kubernetes dev
```
## Configuration
All services use environment variables with sensible defaults:
```bash
# Collector
COLLECTOR_MACHINE_ID=my-machine # Machine identifier
COLLECTOR_AGGREGATOR_URL=localhost:50051
COLLECTOR_COLLECTION_INTERVAL=5 # Seconds between collections
# Common
REDIS_URL=redis://localhost:6379
TIMESCALE_URL=postgresql://monitor:monitor@localhost:5432/monitor
LOG_LEVEL=INFO
LOG_FORMAT=json
```
## Metrics Collected
- CPU: Overall percentage, per-core usage
- Memory: Percentage, used/available bytes
- Disk: Percentage, used bytes, read/write throughput
- Network: Bytes sent/received per second, connection count
- System: Process count, load averages (1m, 5m, 15m)
## Development
### Local Development with Hot Reload
```bash
# Use the override file for volume mounts
docker compose -f docker-compose.yml -f docker-compose.override.yml up
```
### Kubernetes Development with Tilt
```bash
tilt up
```
### Running Services Individually
```bash
# Install dependencies
python -m venv .venv
source .venv/bin/activate
pip install -r services/collector/requirements.txt
# Generate protobuf code
python -m grpc_tools.protoc -I proto --python_out=. --grpc_python_out=. proto/metrics.proto
# Run a service
python services/collector/main.py
```
## API Endpoints
### REST (Gateway)
| Endpoint | Description |
|----------|-------------|
| `GET /` | Dashboard UI |
| `GET /api/machines` | List all monitored machines |
| `GET /api/machines/{id}/metrics` | Current metrics for a machine |
| `GET /api/machines/{id}/history` | Historical metrics |
| `GET /health` | Health check |
| `GET /ready` | Readiness check (includes dependencies) |
### WebSocket
Connect to `ws://localhost:8000/ws` for real-time metric updates.
## Documentation
Detailed documentation is available in the `docs/` folder:
- [Architecture Diagrams](docs/architecture/) - System overview, data flow, deployment
- [Building sysmonstm](docs/explainer/sysmonstm-from-start-to-finish.md) - Deep dive into implementation decisions
- [Domain Applications](docs/explainer/other-applications.md) - How these patterns apply to payment processing and other domains
## Tech Stack
- **Python 3.11+** with async/await throughout
- **gRPC** for inter-service communication
- **FastAPI** for REST API and WebSocket
- **Redis** for caching and pub/sub
- **TimescaleDB** for time-series storage
- **psutil** for system metrics collection
- **Docker Compose** for orchestration
## License
MIT

72
ctrl/README.md Normal file
View File

@@ -0,0 +1,72 @@
# Deployment Configurations
## Architecture
```
┌─────────────┐ ┌─────────────────────────────────────┐ ┌─────────────┐
│ Collector │────▶│ Aggregator + Gateway + Redis + TS │────▶│ Edge │────▶ Browser
│ (mcrn) │gRPC │ (LOCAL) │ WS │ (AWS) │ WS
└─────────────┘ └─────────────────────────────────────┘ └─────────────┘
┌─────────────┐ │
│ Collector │────────────────────┘
│ (nfrt) │gRPC
└─────────────┘
```
- **Collectors** use gRPC to stream metrics to the local aggregator
- **Gateway** forwards to edge via WebSocket (if `EDGE_URL` configured)
- **Edge** (AWS) relays to browsers via WebSocket
## Directory Structure
```
ctrl/
├── dev/ # Full stack for local development (docker-compose)
└── edge/ # Cloud dashboard for AWS deployment
```
## Local Development
```bash
# From repo root
docker compose up
```
Runs: aggregator, gateway, collector, alerts, redis, timescaledb
## Production Deployment
### 1. Deploy Edge to AWS
```bash
cd ctrl/edge
docker compose up -d
```
### 2. Run Full Stack Locally with Edge Forwarding
```bash
EDGE_URL=wss://sysmonstm.mcrn.ar/ws EDGE_API_KEY=xxx docker compose up
```
### 3. Run Collectors on Other Machines
```bash
docker run -d --name sysmonstm-collector --network host \
-e AGGREGATOR_URL=<local-gateway-ip>:50051 \
-e MACHINE_ID=$(hostname) \
registry.mcrn.ar/sysmonstm/collector:latest
```
## Environment Variables
### Gateway (for edge forwarding)
- `EDGE_URL` - WebSocket URL of edge (e.g., wss://sysmonstm.mcrn.ar/ws)
- `EDGE_API_KEY` - Authentication key for edge
### Edge
- `API_KEY` - Key required from gateway
### Collector
- `AGGREGATOR_URL` - gRPC URL of aggregator (e.g., localhost:50051)
- `MACHINE_ID` - Identifier for this machine

154
ctrl/dev/docker-compose.yml Normal file
View File

@@ -0,0 +1,154 @@
version: "3.8"
# This file works both locally and on EC2 for demo purposes.
# For local dev with hot-reload, use: docker compose -f docker-compose.yml -f docker-compose.override.yml up
x-common-env: &common-env
REDIS_URL: redis://redis:6379
TIMESCALE_URL: postgresql://monitor:monitor@timescaledb:5432/monitor
EVENTS_BACKEND: redis_pubsub
LOG_LEVEL: ${LOG_LEVEL:-INFO}
LOG_FORMAT: json
x-healthcheck-defaults: &healthcheck-defaults
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
services:
# =============================================================================
# Infrastructure
# =============================================================================
redis:
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
volumes:
- redis-data:/data
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "redis-cli", "ping"]
deploy:
resources:
limits:
memory: 128M
timescaledb:
image: timescale/timescaledb:latest-pg15
environment:
POSTGRES_USER: monitor
POSTGRES_PASSWORD: monitor
POSTGRES_DB: monitor
ports:
- "${TIMESCALE_PORT:-5432}:5432"
volumes:
- timescale-data:/var/lib/postgresql/data
- ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
<<: *healthcheck-defaults
test: ["CMD-SHELL", "pg_isready -U monitor -d monitor"]
deploy:
resources:
limits:
memory: 512M
# =============================================================================
# Application Services
# =============================================================================
aggregator:
build:
context: .
dockerfile: services/aggregator/Dockerfile
environment:
<<: *common-env
GRPC_PORT: 50051
SERVICE_NAME: aggregator
ports:
- "${AGGREGATOR_GRPC_PORT:-50051}:50051"
depends_on:
redis:
condition: service_healthy
timescaledb:
condition: service_healthy
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "/bin/grpc_health_probe", "-addr=:50051"]
deploy:
resources:
limits:
memory: 256M
gateway:
build:
context: .
dockerfile: services/gateway/Dockerfile
environment:
<<: *common-env
HTTP_PORT: 8000
AGGREGATOR_URL: aggregator:50051
SERVICE_NAME: gateway
ports:
- "${GATEWAY_PORT:-8000}:8000"
depends_on:
- aggregator
- redis
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
deploy:
resources:
limits:
memory: 256M
alerts:
build:
context: .
dockerfile: services/alerts/Dockerfile
environment:
<<: *common-env
SERVICE_NAME: alerts
depends_on:
redis:
condition: service_healthy
timescaledb:
condition: service_healthy
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "python", "-c", "import sys; sys.exit(0)"]
deploy:
resources:
limits:
memory: 128M
# Collector runs separately on each machine being monitored
# For local testing, we run one instance
collector:
build:
context: .
dockerfile: services/collector/Dockerfile
environment:
<<: *common-env
AGGREGATOR_URL: aggregator:50051
MACHINE_ID: ${MACHINE_ID:-local-dev}
COLLECTION_INTERVAL: ${COLLECTION_INTERVAL:-5}
SERVICE_NAME: collector
depends_on:
- aggregator
deploy:
resources:
limits:
memory: 64M
# For actual system metrics, you might need:
# privileged: true
# pid: host
volumes:
redis-data:
timescale-data:
networks:
default:
name: sysmonstm

14
ctrl/edge/Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets
COPY edge.py .
ENV API_KEY=""
ENV LOG_LEVEL=INFO
EXPOSE 8080
CMD ["uvicorn", "edge:app", "--host", "0.0.0.0", "--port", "8080"]

View File

@@ -0,0 +1,16 @@
services:
edge:
image: registry.mcrn.ar/sysmonstm/edge:latest
container_name: sysmonstm-edge
restart: unless-stopped
environment:
- API_KEY=${API_KEY:-}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
ports:
- "8080:8080"
networks:
- gateway
networks:
gateway:
external: true

View File

@@ -1,11 +1,26 @@
"""Minimal sysmonstm gateway - standalone mode without dependencies."""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json
import asyncio
import json
import logging
import os
from datetime import datetime
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
# Configuration
API_KEY = os.environ.get("API_KEY", "")
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
# Logging setup
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("gateway")
app = FastAPI(title="sysmonstm")
# Store connected websockets
@@ -107,7 +122,8 @@ HTML = """
let machines = {};
function connect() {
const ws = new WebSocket(`wss://${location.host}/ws`);
const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = new WebSocket(`${protocol}//${location.host}/ws`);
ws.onopen = () => {
statusDot.classList.add('ok');
@@ -130,7 +146,7 @@ HTML = """
}
function render() {
const ids = Object.keys(machines);
const ids = Object.keys(machines).sort();
if (ids.length === 0) {
machinesEl.innerHTML = '<div class="empty"><h2>No collectors connected</h2><p>Start a collector to see metrics</p></div>';
return;
@@ -138,13 +154,16 @@ HTML = """
machinesEl.innerHTML = ids.map(id => {
const m = machines[id];
const ts = m.timestamp ? new Date(m.timestamp * 1000).toLocaleTimeString() : '-';
return `
<div class="machine">
<h3>${id}</h3>
<div class="metric"><span>CPU</span><span>${m.cpu?.toFixed(1) || '-'}%</span></div>
<div class="metric"><span>Memory</span><span>${m.memory?.toFixed(1) || '-'}%</span></div>
<div class="metric"><span>Disk</span><span>${m.disk?.toFixed(1) || '-'}%</span></div>
<div class="metric"><span>Updated</span><span>${new Date(m.timestamp).toLocaleTimeString()}</span></div>
<div class="metric"><span>Load (1m)</span><span>${m.load_1m?.toFixed(2) || '-'}</span></div>
<div class="metric"><span>Processes</span><span>${m.processes || '-'}</span></div>
<div class="metric"><span>Updated</span><span>${ts}</span></div>
</div>
`;
}).join('');
@@ -156,43 +175,82 @@ HTML = """
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML
@app.get("/health")
async def health():
return {"status": "ok", "machines": len(machines)}
@app.get("/api/machines")
async def get_machines():
return machines
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
async def websocket_endpoint(websocket: WebSocket, key: str = Query(default="")):
# API key validation for collectors (browsers don't need key)
# Check if this looks like a collector (will send metrics) or browser (will receive)
# We validate key only when metrics are received, allowing browsers to connect freely
await websocket.accept()
connections.append(websocket)
client = websocket.client.host if websocket.client else "unknown"
log.info(f"WebSocket connected: {client}")
try:
# Send current state
# Send current state to new connection
for machine_id, data in machines.items():
await websocket.send_json({"type": "metrics", "machine_id": machine_id, **data})
# Keep alive
await websocket.send_json(
{"type": "metrics", "machine_id": machine_id, **data}
)
# Main loop
while True:
try:
msg = await asyncio.wait_for(websocket.receive_text(), timeout=30)
data = json.loads(msg)
if data.get("type") == "metrics":
# Validate API key for metric submissions
if API_KEY and key != API_KEY:
log.warning(f"Invalid API key from {client}")
await websocket.close(code=4001, reason="Invalid API key")
return
machine_id = data.get("machine_id", "unknown")
machines[machine_id] = {**data, "timestamp": datetime.utcnow().isoformat()}
# Broadcast to all
machines[machine_id] = data
log.debug(f"Metrics from {machine_id}: cpu={data.get('cpu')}%")
# Broadcast to all connected clients
for conn in connections:
try:
await conn.send_json({"type": "metrics", "machine_id": machine_id, **machines[machine_id]})
except:
await conn.send_json(
{"type": "metrics", "machine_id": machine_id, **data}
)
except Exception:
pass
except asyncio.TimeoutError:
# Send ping to keep connection alive
await websocket.send_json({"type": "ping"})
except WebSocketDisconnect:
pass
log.info(f"WebSocket disconnected: {client}")
except Exception as e:
log.error(f"WebSocket error: {e}")
finally:
if websocket in connections:
connections.remove(websocket)
if __name__ == "__main__":
import uvicorn
log.info("Starting sysmonstm gateway")
log.info(f" API key: {'configured' if API_KEY else 'not set (open)'}")
uvicorn.run(app, host="0.0.0.0", port=8080)

View File

@@ -1,6 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir fastapi uvicorn[standard] websockets
COPY main.py .
EXPOSE 8080
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

View File

@@ -1,13 +0,0 @@
services:
sysmonstm:
build: .
container_name: sysmonstm
restart: unless-stopped
ports:
- "8080:8080"
networks:
- gateway
networks:
gateway:
external: true

View File

@@ -1,154 +0,0 @@
version: "3.8"
# This file works both locally and on EC2 for demo purposes.
# For local dev with hot-reload, use: docker compose -f docker-compose.yml -f docker-compose.override.yml up
x-common-env: &common-env
REDIS_URL: redis://redis:6379
TIMESCALE_URL: postgresql://monitor:monitor@timescaledb:5432/monitor
EVENTS_BACKEND: redis_pubsub
LOG_LEVEL: ${LOG_LEVEL:-INFO}
LOG_FORMAT: json
x-healthcheck-defaults: &healthcheck-defaults
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
services:
# =============================================================================
# Infrastructure
# =============================================================================
redis:
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
volumes:
- redis-data:/data
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "redis-cli", "ping"]
deploy:
resources:
limits:
memory: 128M
timescaledb:
image: timescale/timescaledb:latest-pg15
environment:
POSTGRES_USER: monitor
POSTGRES_PASSWORD: monitor
POSTGRES_DB: monitor
ports:
- "${TIMESCALE_PORT:-5432}:5432"
volumes:
- timescale-data:/var/lib/postgresql/data
- ./scripts/init-db.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
<<: *healthcheck-defaults
test: ["CMD-SHELL", "pg_isready -U monitor -d monitor"]
deploy:
resources:
limits:
memory: 512M
# =============================================================================
# Application Services
# =============================================================================
aggregator:
build:
context: .
dockerfile: services/aggregator/Dockerfile
environment:
<<: *common-env
GRPC_PORT: 50051
SERVICE_NAME: aggregator
ports:
- "${AGGREGATOR_GRPC_PORT:-50051}:50051"
depends_on:
redis:
condition: service_healthy
timescaledb:
condition: service_healthy
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "/bin/grpc_health_probe", "-addr=:50051"]
deploy:
resources:
limits:
memory: 256M
gateway:
build:
context: .
dockerfile: services/gateway/Dockerfile
environment:
<<: *common-env
HTTP_PORT: 8000
AGGREGATOR_URL: aggregator:50051
SERVICE_NAME: gateway
ports:
- "${GATEWAY_PORT:-8000}:8000"
depends_on:
- aggregator
- redis
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
deploy:
resources:
limits:
memory: 256M
alerts:
build:
context: .
dockerfile: services/alerts/Dockerfile
environment:
<<: *common-env
SERVICE_NAME: alerts
depends_on:
redis:
condition: service_healthy
timescaledb:
condition: service_healthy
healthcheck:
<<: *healthcheck-defaults
test: ["CMD", "python", "-c", "import sys; sys.exit(0)"]
deploy:
resources:
limits:
memory: 128M
# Collector runs separately on each machine being monitored
# For local testing, we run one instance
collector:
build:
context: .
dockerfile: services/collector/Dockerfile
environment:
<<: *common-env
AGGREGATOR_URL: aggregator:50051
MACHINE_ID: ${MACHINE_ID:-local-dev}
COLLECTION_INTERVAL: ${COLLECTION_INTERVAL:-5}
SERVICE_NAME: collector
depends_on:
- aggregator
deploy:
resources:
limits:
memory: 64M
# For actual system metrics, you might need:
# privileged: true
# pid: host
volumes:
redis-data:
timescale-data:
networks:
default:
name: sysmonstm

1
docker-compose.yml Symbolic link
View File

@@ -0,0 +1 @@
ctrl/dev/docker-compose.yml

View File

@@ -32,6 +32,11 @@ logger = setup_logging(
log_format=config.log_format,
)
# Edge forwarding config
EDGE_URL = config.edge_url if hasattr(config, "edge_url") else None
EDGE_API_KEY = config.edge_api_key if hasattr(config, "edge_api_key") else ""
edge_ws = None
# WebSocket connection manager
class ConnectionManager:
@@ -77,6 +82,54 @@ timescale: TimescaleStorage | None = None
grpc_channel: grpc.aio.Channel | None = None
grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None
async def connect_to_edge():
"""Maintain persistent WebSocket connection to edge and forward metrics."""
global edge_ws
if not EDGE_URL:
logger.info("edge_not_configured", msg="No EDGE_URL set, running local only")
return
import websockets
url = EDGE_URL
if EDGE_API_KEY:
separator = "&" if "?" in url else "?"
url = f"{url}{separator}key={EDGE_API_KEY}"
while True:
try:
logger.info("edge_connecting", url=EDGE_URL)
async with websockets.connect(url) as ws:
edge_ws = ws
logger.info("edge_connected")
while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=30)
# Ignore messages from edge (pings, etc)
except asyncio.TimeoutError:
await ws.ping()
except asyncio.CancelledError:
break
except Exception as e:
edge_ws = None
logger.warning("edge_connection_error", error=str(e))
await asyncio.sleep(5)
async def forward_to_edge(data: dict):
"""Forward metrics to edge if connected."""
global edge_ws
if edge_ws:
try:
await edge_ws.send(json.dumps(data))
except Exception as e:
logger.warning("edge_forward_error", error=str(e))
# Track recent events for internals view
recent_events: list[dict] = []
MAX_RECENT_EVENTS = 100
@@ -133,15 +186,17 @@ async def event_listener():
merged_payload = event.payload
# Broadcast merged data to dashboard
await manager.broadcast(
{
broadcast_msg = {
"type": "metrics",
"data": merged_payload,
"timestamp": event.timestamp.isoformat(),
}
)
await manager.broadcast(broadcast_msg)
service_stats["websocket_broadcasts"] += 1
# Forward to edge if connected
await forward_to_edge(broadcast_msg)
# Broadcast to internals (show raw event, not merged)
await internals_manager.broadcast(
{
@@ -176,6 +231,9 @@ async def lifespan(app: FastAPI):
# Start event listener in background
listener_task = asyncio.create_task(event_listener())
# Start edge connection if configured
edge_task = asyncio.create_task(connect_to_edge())
service_stats["started_at"] = datetime.utcnow().isoformat()
logger.info("gateway_started")
@@ -183,10 +241,15 @@ async def lifespan(app: FastAPI):
# Cleanup
listener_task.cancel()
edge_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass
try:
await edge_task
except asyncio.CancelledError:
pass
if grpc_channel:
await grpc_channel.close()

View File

@@ -74,6 +74,10 @@ class GatewayConfig(BaseConfig):
# TimescaleDB - can be set directly via TIMESCALE_URL
timescale_url: str = "postgresql://monitor:monitor@localhost:5432/monitor"
# Edge forwarding (optional - for pushing to cloud edge)
edge_url: str = "" # e.g., wss://sysmonstm.mcrn.ar/ws
edge_api_key: str = ""
class AlertsConfig(BaseConfig):
"""Alerts service configuration."""