Files
sysmonstm/docs/explainer/sysmonstm-from-start-to-finish.md
2026-03-16 13:35:53 -03:00

16 KiB

Building sysmonstm: From Idea to Working System

This is the story of building a distributed system monitoring platform. Not a tutorial with sanitized examples, but an explanation of the actual decisions made, the trade-offs considered, and the code that resulted.

System Architecture Overview

The Problem

I have multiple development machines. A workstation, a laptop, sometimes a remote VM. Each one occasionally runs out of disk space, hits memory limits, or has a runaway process eating CPU. The pattern was always the same: something breaks, I SSH in, run htop, realize the problem, fix it.

The obvious solution is a monitoring dashboard. Something that shows all machines in one place, updates in real-time, and alerts before things break.

But the real motivation was an interview. The job description mentioned gRPC, streaming patterns, event-driven architecture. Building a monitoring system would demonstrate all of these while solving an actual problem.

Architecture Decisions

Why gRPC Instead of REST

REST would work fine. Poll each machine every few seconds, aggregate the results. Simple.

But gRPC offers streaming. Instead of the aggregator asking each machine "what are your metrics right now?", each machine opens a persistent connection and continuously pushes metrics. This is more efficient (one connection instead of repeated requests) and lower latency (metrics arrive as soon as they're collected).

The proto definition in proto/metrics.proto defines this as client-side streaming:

service MetricsService {
  // Client-side streaming: collector streams metrics to aggregator
  rpc StreamMetrics(stream Metric) returns (StreamAck) {}
}

The collector is the client. It streams metrics. The aggregator is the server. It receives them. When the stream ends (collector shuts down, network drops), the aggregator gets a StreamAck response.

gRPC Streaming Pattern

Why This Storage Tier Approach

Metrics have different access patterns at different ages:

  • Right now: The dashboard needs current CPU/memory/disk for all machines. Access pattern: read all, very frequently.
  • Last hour: Graphs showing recent trends. Access pattern: read range, somewhat frequently.
  • Last week: Investigating what happened yesterday. Access pattern: read range, occasionally.
  • Last month: Capacity planning. Access pattern: aggregated queries, rarely.

Storing everything in one place forces a choice between fast reads (keep it all in memory) and storage efficiency (keep it on disk). The solution is tiered storage:

  • Redis (services/aggregator/storage.py): Current state only. Each machine's latest metrics, with 5-minute TTL. Dashboard reads hit Redis.
  • TimescaleDB (scripts/init-db.sql): Historical data. Raw metrics at 5-second resolution for 24 hours, then automatically downsampled to 1-minute and 1-hour aggregates with longer retention.

The aggregator writes to both on every batch. Redis for live dashboard. TimescaleDB for history.

Storage Tiers

Why Event-Driven for Alerts

The alerts service needs to evaluate every metric against threshold rules. Two options:

  1. Direct call: Aggregator calls alerts service for each metric batch.
  2. Event stream: Aggregator publishes events. Alerts service subscribes.

Option 2 decouples them. The aggregator doesn't know or care if the alerts service is running. It publishes events regardless. The alerts service can be restarted, scaled, or replaced without touching the aggregator.

The event abstraction in shared/events/base.py defines the interface:

class EventPublisher(ABC):
    @abstractmethod
    async def publish(self, topic: str, payload: dict[str, Any], **kwargs) -> str:
        pass

class EventSubscriber(ABC):
    @abstractmethod
    async def consume(self) -> AsyncIterator[Event]:
        pass

Currently backed by Redis Pub/Sub (shared/events/redis_pubsub.py). The abstraction means switching to Kafka or RabbitMQ later requires implementing a new backend, not changing any service code.

Event-Driven Architecture

Phase 1: MVP - Getting Streaming to Work

The goal was simple: run a collector, see metrics appear in the aggregator's logs.

The Collector

services/collector/main.py is a gRPC client. The core is an async generator that yields metrics forever:

async def _metric_generator(self):
    """Async generator that yields metrics at the configured interval."""
    while self.running:
        batch = self.collector.collect()
        protos = self._batch_to_proto(batch)

        for proto in protos:
            yield proto

        await asyncio.sleep(self.config.collection_interval)

This generator is passed directly to the gRPC stub:

response = await self.stub.StreamMetrics(self._metric_generator())

The gRPC library handles the streaming. Each yield sends a message. The connection stays open until the generator stops or the network fails.

The actual metric collection happens in services/collector/metrics.py using psutil:

def _collect_cpu(self) -> list[MetricValue]:
    metrics = []
    cpu_percent = psutil.cpu_percent(interval=None)
    metrics.append(MetricValue("CPU_PERCENT", cpu_percent))
    
    per_core = psutil.cpu_percent(interval=None, percpu=True)
    for i, pct in enumerate(per_core):
        metrics.append(MetricValue(
            "CPU_PERCENT_PER_CORE", 
            pct, 
            {"core": str(i)}
        ))
    return metrics

The Aggregator

services/aggregator/main.py is a gRPC server. The StreamMetrics method receives the stream:

async def StreamMetrics(self, request_iterator, context):
    metrics_received = 0
    current_batch: list[tuple[str, float, dict]] = []

    async for metric in request_iterator:
        metrics_received += 1
        
        metric_type = metrics_pb2.MetricType.Name(metric.type)
        current_batch.append((metric_type, metric.value, dict(metric.labels)))

        if len(current_batch) >= 20:
            await self._flush_batch(...)
            current_batch = []

The request_iterator is an async iterator over incoming metrics. The async for loop processes them as they arrive. Batching (flush every 20 metrics) reduces storage writes.

Retry Logic

Networks fail. The collector needs to reconnect. The pattern is exponential backoff:

retry_count = 0
max_retries = 10
base_delay = 1.0

while self.running:
    try:
        await self.stub.StreamMetrics(self._metric_generator())
        retry_count = 0  # Success - reset counter
    except grpc.aio.AioRpcError as e:
        retry_count += 1
        delay = min(base_delay * (2**retry_count), 60.0)  # Cap at 60 seconds
        await asyncio.sleep(delay)
        await self.disconnect()
        await self.connect()

First failure waits 2 seconds. Second waits 4. Third waits 8. Capped at 60 seconds. After 10 failures, give up.

Phase 2: Dashboard - Making It Visible

Metrics in logs are useless. A dashboard makes them useful.

The Gateway

services/gateway/main.py is a FastAPI application serving two purposes:

  1. REST API: Query current and historical metrics
  2. WebSocket: Push real-time updates to browsers

The WebSocket connection manager (services/gateway/main.py:40-67) tracks active connections:

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def broadcast(self, message: dict) -> None:
        data = json.dumps(message)
        for connection in self.active_connections:
            await connection.send_text(data)

Event to WebSocket Bridge

The gateway subscribes to the same event stream as alerts. When a metric event arrives, it broadcasts to all connected browsers:

async def event_listener():
    async with get_subscriber(topics=["metrics.raw", "alerts.*"]) as subscriber:
        async for event in subscriber.consume():
            await manager.broadcast({
                "type": "metrics",
                "data": event.payload,
                "timestamp": event.timestamp.isoformat(),
            })

This runs as a background task, started in the FastAPI lifespan handler (services/gateway/main.py:145-175).

Handling Partial Batches

The aggregator batches metrics (flush every 20). This means a single collection cycle might arrive as multiple events. The dashboard needs complete machine state, not partial updates.

Solution: merge incoming metrics into a cache (services/gateway/main.py:108-120):

machine_metrics_cache: dict[str, dict] = {}

# In event_listener:
machine_id = event.payload.get("machine_id", "")
incoming_metrics = event.payload.get("metrics", {})

if machine_id not in machine_metrics_cache:
    machine_metrics_cache[machine_id] = {}
machine_metrics_cache[machine_id].update(incoming_metrics)

New metrics merge with existing. The broadcast includes the full merged state.

Edge Relay - Public Dashboard Without the Cost

The full stack (aggregator, Redis, TimescaleDB) runs on local hardware. But the dashboard needs to be publicly accessible at sysmonstm.mcrn.ar. Running the full stack on AWS would be expensive and unnecessary.

The solution is an edge relay (ctrl/edge/edge.py). It's a minimal FastAPI app that does one thing: relay WebSocket messages. The gateway forwards metrics to the edge via WebSocket, and the edge broadcasts them to connected browsers:

# Gateway forwards to edge when EDGE_URL is configured
async def forward_to_edge(data: dict):
    if edge_ws:
        await edge_ws.send(json.dumps(data))

The edge receives these and broadcasts to all dashboard viewers:

@app.websocket("/ws")
async def dashboard_ws(websocket: WebSocket):
    await websocket.accept()
    clients.add(websocket)
    # ... broadcasts incoming metrics to all clients

This keeps heavy processing (gRPC, storage, event evaluation) on local hardware and puts only a lightweight relay in the cloud. The AWS instance has no databases, no gRPC, no storage — just WebSocket in, WebSocket out.

Phase 3: Alerts - Adding Intelligence

The alerts service subscribes to metric events and evaluates them against rules.

Rule Evaluation

services/alerts/main.py defines an AlertEvaluator class:

class AlertEvaluator:
    OPERATORS = {
        "gt": lambda v, t: v > t,
        "lt": lambda v, t: v < t,
        "gte": lambda v, t: v >= t,
        "lte": lambda v, t: v <= t,
        "eq": lambda v, t: v == t,
    }

    def evaluate(self, machine_id: str, metrics: dict[str, float]) -> list[Alert]:
        new_alerts = []
        for metric_type, value in metrics.items():
            rule = self.rules.get(metric_type)
            if not rule:
                continue
            
            op_func = self.OPERATORS.get(rule.operator)
            if op_func(value, rule.threshold):
                # Threshold exceeded
                new_alerts.append(Alert(...))
        return new_alerts

Avoiding Duplicate Alerts

If CPU stays above 80% for an hour, we want one alert, not 720 (one per 5-second check).

The evaluator tracks active alerts:

self.active_alerts: dict[str, Alert] = {}  # key: f"{machine_id}:{rule_name}"

# In evaluate():
alert_key = f"{machine_id}:{rule.name}"
if op_func(value, rule.threshold):
    if alert_key not in self.active_alerts:
        # New alert - trigger it
        self.active_alerts[alert_key] = alert
        new_alerts.append(alert)
    # Otherwise already active - ignore
else:
    # Threshold no longer exceeded - resolve
    if alert_key in self.active_alerts:
        del self.active_alerts[alert_key]

New alert only triggers if not already in active_alerts. When the metric drops below threshold, the alert is removed and can trigger again later.

Phase 4: Polish - Production Patterns

Structured Logging

Every service uses shared/logging.py for structured JSON logging:

logger.info(
    "stream_completed",
    machine_id=current_machine,
    metrics_received=metrics_received,
)

Output:

{"event": "stream_completed", "machine_id": "workstation", "metrics_received": 1500, "timestamp": "..."}

This is searchable. "Show me all logs where metrics_received > 1000" is a simple query.

Health Checks

Every service has health endpoints. The aggregator uses gRPC health checking (services/aggregator/main.py:236-240):

health_servicer = health.HealthServicer()
health_servicer.set("", health_pb2.HealthCheckResponse.SERVING)
health_servicer.set("MetricsService", health_pb2.HealthCheckResponse.SERVING)
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, self.server)

The gateway has HTTP health endpoints (services/gateway/main.py:197-216):

@app.get("/ready")
async def readiness_check():
    checks = {"gateway": "ok"}
    
    try:
        await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0)
        checks["aggregator"] = "ok"
    except Exception as e:
        checks["aggregator"] = f"error: {str(e)}"
    
    return {"status": "ready", "checks": checks}

Graceful Degradation

The aggregator continues streaming even if storage fails (services/aggregator/main.py:137-152):

try:
    await self.redis.update_machine_state(...)
except Exception as e:
    self.logger.warning("redis_update_failed", error=str(e))
    # Don't re-raise - continue processing

try:
    await self.timescale.insert_metrics(...)
except Exception as e:
    self.logger.warning("timescale_insert_failed", error=str(e))
    # Don't re-raise - continue processing

Redis down? Metrics still flow to TimescaleDB. TimescaleDB down? Metrics still flow to the event stream. This keeps the system partially functional during partial failures.

Configuration

All configuration uses Pydantic with environment variable support (shared/config.py):

class CollectorConfig(BaseSettings):
    machine_id: str = Field(default_factory=lambda: socket.gethostname())
    aggregator_url: str = "aggregator:50051"
    collection_interval: int = 5
    
    model_config = SettingsConfigDict(env_prefix="COLLECTOR_")

Set COLLECTOR_AGGREGATOR_URL=192.168.1.100:50051 and it overrides the default. No code changes for different environments.

What Worked

The event abstraction. Adding a new consumer (like the gateway's WebSocket bridge) required zero changes to the aggregator. Subscribe to the topic, process events.

Tiered storage. Redis handles the hot path (dashboard reads). TimescaleDB handles history. Each optimized for its access pattern.

Graceful degradation. During development, I regularly restarted individual services. The system stayed partially functional throughout.

What Could Be Better

No backpressure. If the aggregator falls behind, events accumulate in memory. A production system would need flow control.

Alert rules are database-only. Changing thresholds requires database updates. A proper config management system would be better.

No authentication. The gRPC channels are insecure. Production would need TLS and service authentication.

Key Files Reference

Component File Purpose
Proto definitions proto/metrics.proto gRPC service and message definitions
Collector main services/collector/main.py gRPC client, streaming logic
Metric collection services/collector/metrics.py psutil wrappers
Aggregator main services/aggregator/main.py gRPC server, batch processing
Storage layer services/aggregator/storage.py Redis + TimescaleDB abstraction
Gateway main services/gateway/main.py FastAPI, WebSocket, event bridge
Alerts main services/alerts/main.py Event subscription, rule evaluation
Event abstraction shared/events/base.py Publisher/subscriber interfaces
Redis events shared/events/redis_pubsub.py Redis Pub/Sub implementation
Configuration shared/config.py Pydantic settings for all services
DB initialization scripts/init-db.sql TimescaleDB schema, hypertables
Edge relay ctrl/edge/edge.py WebSocket relay for AWS dashboard
Docker setup ctrl/dev/docker-compose.yml Full stack orchestration

Running It

docker compose up

Open http://localhost:8000 for the dashboard. Metrics appear within seconds.

To add another machine, run the collector pointed at your aggregator:

COLLECTOR_AGGREGATOR_URL=your-server:50051 python services/collector/main.py

It connects, starts streaming, and appears on the dashboard.