No machines connected
+Waiting for collectors to send metrics...
+diff --git a/ctlptl.yaml b/ctlptl.yaml deleted file mode 100644 index b81ba07..0000000 --- a/ctlptl.yaml +++ /dev/null @@ -1,32 +0,0 @@ -# ctlptl configuration for Kind cluster -# Usage: ctlptl apply -f ctlptl.yaml - -apiVersion: ctlptl.dev/v1alpha1 -kind: Registry -name: sysmonstm-registry -port: 5005 ---- -apiVersion: ctlptl.dev/v1alpha1 -kind: Cluster -product: kind -registry: sysmonstm-registry -kindV1Alpha4Cluster: - name: sysmonstm - nodes: - - role: control-plane - extraPortMappings: - # Gateway HTTP - - containerPort: 30080 - hostPort: 8080 - protocol: TCP - # Aggregator gRPC - - containerPort: 30051 - hostPort: 50051 - protocol: TCP - # Resource limits for t2.small compatibility - kubeadmConfigPatches: - - | - kind: InitConfiguration - nodeRegistration: - kubeletExtraArgs: - system-reserved: memory=256Mi diff --git a/scripts/generate-proto.sh b/scripts/generate-proto.sh new file mode 100755 index 0000000..27a30fb --- /dev/null +++ b/scripts/generate-proto.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Generate Python gRPC code from proto definitions + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$SCRIPT_DIR/.." + +cd "$PROJECT_ROOT" + +echo "Generating Python gRPC code from proto/metrics.proto..." + +python -m grpc_tools.protoc \ + -I./proto \ + --python_out=./shared \ + --grpc_python_out=./shared \ + ./proto/metrics.proto + +# Fix imports in generated files (grpc_tools generates incorrect imports) +sed -i 's/import metrics_pb2/from shared import metrics_pb2/' shared/metrics_pb2_grpc.py + +echo "Generated:" +echo " - shared/metrics_pb2.py" +echo " - shared/metrics_pb2_grpc.py" diff --git a/services/aggregator/__init__.py b/services/aggregator/__init__.py new file mode 100644 index 0000000..39c0db5 --- /dev/null +++ b/services/aggregator/__init__.py @@ -0,0 +1 @@ +"""Aggregator service.""" diff --git a/services/aggregator/main.py b/services/aggregator/main.py new file mode 100644 index 0000000..9d04649 --- /dev/null +++ b/services/aggregator/main.py @@ -0,0 +1,361 @@ +"""Aggregator service - gRPC server that receives metrics and stores them.""" + +import asyncio +import signal +import sys +from pathlib import Path + +import grpc +from grpc_health.v1 import health, health_pb2, health_pb2_grpc + +# Add project root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from services.aggregator.storage import RedisStorage, TimescaleStorage +from shared import metrics_pb2, metrics_pb2_grpc +from shared.config import get_aggregator_config +from shared.events import get_publisher +from shared.logging import setup_logging + + +class MetricsServicer(metrics_pb2_grpc.MetricsServiceServicer): + """gRPC servicer for metrics ingestion.""" + + def __init__( + self, + redis_storage: RedisStorage, + timescale_storage: TimescaleStorage, + event_publisher, + logger, + ): + self.redis = redis_storage + self.timescale = timescale_storage + self.publisher = event_publisher + self.logger = logger + + async def StreamMetrics(self, request_iterator, context): + """Receive streaming metrics from a collector.""" + metrics_received = 0 + current_machine = None + current_batch: list[tuple[str, float, dict]] = [] + batch_timestamp = 0 + batch_hostname = "" + + try: + async for metric in request_iterator: + metrics_received += 1 + + # Track current machine + if current_machine != metric.machine_id: + # Flush previous batch if switching machines + if current_machine and current_batch: + await self._flush_batch( + current_machine, + batch_hostname, + batch_timestamp, + current_batch, + ) + current_batch = [] + + current_machine = metric.machine_id + self.logger.info( + "collector_connected", + machine_id=metric.machine_id, + hostname=metric.hostname, + ) + + # Get metric type name + metric_type = metrics_pb2.MetricType.Name(metric.type) + + # Add to batch + current_batch.append( + ( + metric_type, + metric.value, + dict(metric.labels), + ) + ) + batch_timestamp = metric.timestamp_ms + batch_hostname = metric.hostname + + # Flush batch every 20 metrics or if timestamp changes significantly + if len(current_batch) >= 20: + await self._flush_batch( + current_machine, batch_hostname, batch_timestamp, current_batch + ) + current_batch = [] + + # Flush remaining + if current_machine and current_batch: + await self._flush_batch( + current_machine, batch_hostname, batch_timestamp, current_batch + ) + + self.logger.info( + "stream_completed", + machine_id=current_machine, + metrics_received=metrics_received, + ) + + return metrics_pb2.StreamAck( + success=True, + metrics_received=metrics_received, + message="OK", + ) + + except Exception as e: + self.logger.error( + "stream_error", + error=str(e), + machine_id=current_machine, + metrics_received=metrics_received, + ) + return metrics_pb2.StreamAck( + success=False, + metrics_received=metrics_received, + message=str(e), + ) + + async def _flush_batch( + self, + machine_id: str, + hostname: str, + timestamp_ms: int, + batch: list[tuple[str, float, dict]], + ) -> None: + """Flush a batch of metrics to storage and events.""" + + # Aggregate metrics for Redis state + metrics_dict = {} + for metric_type, value, labels in batch: + key = metric_type + if labels: + key = f"{metric_type}:{','.join(f'{k}={v}' for k, v in labels.items())}" + metrics_dict[key] = value + + # Update Redis (current state) + await self.redis.update_machine_state( + machine_id=machine_id, + hostname=hostname, + metrics=metrics_dict, + timestamp_ms=timestamp_ms, + ) + + # Insert into TimescaleDB (historical) + try: + await self.timescale.insert_metrics( + machine_id=machine_id, + hostname=hostname, + timestamp_ms=timestamp_ms, + metrics=batch, + ) + except Exception as e: + self.logger.warning("timescale_insert_failed", error=str(e)) + + # Update machine registry + try: + await self.timescale.update_machine_registry( + machine_id=machine_id, + hostname=hostname, + ) + except Exception as e: + self.logger.warning("machine_registry_update_failed", error=str(e)) + + # Publish event for subscribers (alerts, gateway) + await self.publisher.publish( + topic="metrics.raw", + payload={ + "machine_id": machine_id, + "hostname": hostname, + "timestamp_ms": timestamp_ms, + "metrics": metrics_dict, + }, + ) + + self.logger.debug( + "batch_flushed", + machine_id=machine_id, + count=len(batch), + ) + + async def GetCurrentState(self, request, context): + """Get current state for a single machine.""" + state = await self.redis.get_machine_state(request.machine_id) + + if not state: + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details(f"Machine {request.machine_id} not found") + return metrics_pb2.MachineState() + + # Convert state to proto + metrics = [] + for key, value in state.get("metrics", {}).items(): + parts = key.split(":") + metric_type_str = parts[0] + labels = {} + if len(parts) > 1: + for pair in parts[1].split(","): + k, v = pair.split("=") + labels[k] = v + + metric_type = getattr(metrics_pb2, metric_type_str, 0) + metrics.append( + metrics_pb2.Metric( + machine_id=state["machine_id"], + hostname=state["hostname"], + timestamp_ms=state["last_seen_ms"], + type=metric_type, + value=value, + labels=labels, + ) + ) + + return metrics_pb2.MachineState( + machine_id=state["machine_id"], + hostname=state["hostname"], + last_seen_ms=state["last_seen_ms"], + current_metrics=metrics, + health=metrics_pb2.HEALTHY, + ) + + async def GetAllStates(self, request, context): + """Get current state for all machines.""" + states = await self.redis.get_all_machines() + + machine_states = [] + for state in states: + metrics = [] + for key, value in state.get("metrics", {}).items(): + parts = key.split(":") + metric_type_str = parts[0] + metric_type = getattr(metrics_pb2, metric_type_str, 0) + metrics.append( + metrics_pb2.Metric( + machine_id=state["machine_id"], + hostname=state["hostname"], + timestamp_ms=state["last_seen_ms"], + type=metric_type, + value=value, + ) + ) + + machine_states.append( + metrics_pb2.MachineState( + machine_id=state["machine_id"], + hostname=state["hostname"], + last_seen_ms=state["last_seen_ms"], + current_metrics=metrics, + health=metrics_pb2.HEALTHY, + ) + ) + + return metrics_pb2.AllMachinesState(machines=machine_states) + + +class AggregatorService: + """Main aggregator service.""" + + def __init__(self): + self.config = get_aggregator_config() + self.logger = setup_logging( + service_name=self.config.service_name, + log_level=self.config.log_level, + log_format=self.config.log_format, + ) + + self.redis = RedisStorage(self.config.redis_url) + self.timescale = TimescaleStorage(self.config.timescale_url) + self.publisher = get_publisher(source="aggregator") + + self.server: grpc.aio.Server | None = None + self.running = False + + async def start(self) -> None: + """Start the gRPC server.""" + self.running = True + + # Connect to storage + await self.redis.connect() + + try: + await self.timescale.connect() + except Exception as e: + self.logger.warning( + "timescale_connection_failed", + error=str(e), + message="Continuing without TimescaleDB - metrics won't be persisted", + ) + + # Connect to event publisher + await self.publisher.connect() + + # Create gRPC server + self.server = grpc.aio.server() + + # Add metrics servicer + servicer = MetricsServicer( + redis_storage=self.redis, + timescale_storage=self.timescale, + event_publisher=self.publisher, + logger=self.logger, + ) + metrics_pb2_grpc.add_MetricsServiceServicer_to_server(servicer, self.server) + + # Add health check servicer + health_servicer = health.HealthServicer() + health_servicer.set("", health_pb2.HealthCheckResponse.SERVING) + health_servicer.set("MetricsService", health_pb2.HealthCheckResponse.SERVING) + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, self.server) + + # Start server + listen_addr = f"[::]:{self.config.grpc_port}" + self.server.add_insecure_port(listen_addr) + + await self.server.start() + + self.logger.info( + "aggregator_started", + port=self.config.grpc_port, + listen_addr=listen_addr, + ) + + async def stop(self) -> None: + """Stop the gRPC server.""" + self.running = False + + if self.server: + await self.server.stop(grace=5) + self.server = None + + await self.publisher.disconnect() + await self.timescale.disconnect() + await self.redis.disconnect() + + self.logger.info("aggregator_stopped") + + async def wait(self) -> None: + """Wait for the server to terminate.""" + if self.server: + await self.server.wait_for_termination() + + +async def main(): + """Main entry point.""" + service = AggregatorService() + + # Handle shutdown signals + loop = asyncio.get_event_loop() + + async def shutdown(): + service.logger.info("shutdown_signal_received") + await service.stop() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown())) + + await service.start() + await service.wait() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/aggregator/storage.py b/services/aggregator/storage.py new file mode 100644 index 0000000..1ea44a8 --- /dev/null +++ b/services/aggregator/storage.py @@ -0,0 +1,245 @@ +"""Storage layer for metrics - Redis (current state) and TimescaleDB (historical).""" + +import json +import time +from datetime import datetime +from typing import Any + +import asyncpg +import redis.asyncio as redis + +from shared.logging import get_logger + +logger = get_logger("storage") + + +class RedisStorage: + """Redis storage for current machine state.""" + + def __init__(self, redis_url: str): + self.redis_url = redis_url + self._client: redis.Redis | None = None + + async def connect(self) -> None: + self._client = redis.from_url(self.redis_url, decode_responses=True) + await self._client.ping() + logger.info("redis_connected", url=self.redis_url) + + async def disconnect(self) -> None: + if self._client: + await self._client.close() + self._client = None + logger.info("redis_disconnected") + + async def update_machine_state( + self, + machine_id: str, + hostname: str, + metrics: dict[str, float], + timestamp_ms: int, + ) -> None: + """Update the current state for a machine.""" + if not self._client: + raise RuntimeError("Not connected to Redis") + + state = { + "machine_id": machine_id, + "hostname": hostname, + "last_seen_ms": timestamp_ms, + "metrics": metrics, + "updated_at": datetime.utcnow().isoformat(), + } + + # Store as hash for efficient partial reads + key = f"machine:{machine_id}" + await self._client.hset( + key, + mapping={ + "state": json.dumps(state), + "last_seen": str(timestamp_ms), + }, + ) + + # Set expiry - if no updates for 5 minutes, consider stale + await self._client.expire(key, 300) + + # Add to active machines set + await self._client.sadd("machines:active", machine_id) + + async def get_machine_state(self, machine_id: str) -> dict[str, Any] | None: + """Get current state for a machine.""" + if not self._client: + raise RuntimeError("Not connected to Redis") + + key = f"machine:{machine_id}" + data = await self._client.hget(key, "state") + + if data: + return json.loads(data) + return None + + async def get_all_machines(self) -> list[dict[str, Any]]: + """Get current state for all active machines.""" + if not self._client: + raise RuntimeError("Not connected to Redis") + + machine_ids = await self._client.smembers("machines:active") + states = [] + + for machine_id in machine_ids: + state = await self.get_machine_state(machine_id) + if state: + states.append(state) + else: + # Remove stale machine from active set + await self._client.srem("machines:active", machine_id) + + return states + + +class TimescaleStorage: + """TimescaleDB storage for historical metrics.""" + + def __init__(self, connection_url: str): + self.connection_url = connection_url + self._pool: asyncpg.Pool | None = None + + async def connect(self) -> None: + self._pool = await asyncpg.create_pool( + self.connection_url, + min_size=2, + max_size=10, + ) + logger.info("timescaledb_connected") + + async def disconnect(self) -> None: + if self._pool: + await self._pool.close() + self._pool = None + logger.info("timescaledb_disconnected") + + async def insert_metrics( + self, + machine_id: str, + hostname: str, + timestamp_ms: int, + metrics: list[tuple[str, float, dict[str, str]]], + ) -> int: + """ + Insert a batch of metrics. + + Args: + machine_id: Machine identifier + hostname: Machine hostname + timestamp_ms: Timestamp in milliseconds + metrics: List of (metric_type, value, labels) tuples + + Returns: + Number of rows inserted + """ + if not self._pool: + raise RuntimeError("Not connected to TimescaleDB") + + timestamp = datetime.utcfromtimestamp(timestamp_ms / 1000) + + # Prepare batch insert + rows = [ + (timestamp, machine_id, hostname, metric_type, value, json.dumps(labels)) + for metric_type, value, labels in metrics + ] + + async with self._pool.acquire() as conn: + await conn.executemany( + """ + INSERT INTO metrics_raw (time, machine_id, hostname, metric_type, value, labels) + VALUES ($1, $2, $3, $4, $5, $6) + """, + rows, + ) + + return len(rows) + + async def update_machine_registry( + self, + machine_id: str, + hostname: str, + health: str = "HEALTHY", + ) -> None: + """Update the machines registry with last seen time.""" + if not self._pool: + raise RuntimeError("Not connected to TimescaleDB") + + async with self._pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO machines (machine_id, hostname, last_seen, health) + VALUES ($1, $2, NOW(), $3) + ON CONFLICT (machine_id) DO UPDATE + SET hostname = $2, last_seen = NOW(), health = $3 + """, + machine_id, + hostname, + health, + ) + + async def get_metrics( + self, + machine_id: str | None = None, + metric_type: str | None = None, + start_time: datetime | None = None, + end_time: datetime | None = None, + limit: int = 1000, + ) -> list[dict[str, Any]]: + """Query historical metrics.""" + if not self._pool: + raise RuntimeError("Not connected to TimescaleDB") + + conditions = [] + params = [] + param_idx = 1 + + if machine_id: + conditions.append(f"machine_id = ${param_idx}") + params.append(machine_id) + param_idx += 1 + + if metric_type: + conditions.append(f"metric_type = ${param_idx}") + params.append(metric_type) + param_idx += 1 + + if start_time: + conditions.append(f"time >= ${param_idx}") + params.append(start_time) + param_idx += 1 + + if end_time: + conditions.append(f"time <= ${param_idx}") + params.append(end_time) + param_idx += 1 + + where_clause = " AND ".join(conditions) if conditions else "TRUE" + + query = f""" + SELECT time, machine_id, hostname, metric_type, value, labels + FROM metrics_raw + WHERE {where_clause} + ORDER BY time DESC + LIMIT ${param_idx} + """ + params.append(limit) + + async with self._pool.acquire() as conn: + rows = await conn.fetch(query, *params) + + return [ + { + "time": row["time"].isoformat(), + "machine_id": row["machine_id"], + "hostname": row["hostname"], + "metric_type": row["metric_type"], + "value": row["value"], + "labels": json.loads(row["labels"]) if row["labels"] else {}, + } + for row in rows + ] diff --git a/services/alerts/Dockerfile b/services/alerts/Dockerfile index d1300a9..547bca8 100644 --- a/services/alerts/Dockerfile +++ b/services/alerts/Dockerfile @@ -14,6 +14,12 @@ RUN pip install --no-cache-dir -r requirements.txt COPY shared /app/shared COPY proto /app/proto +RUN python -m grpc_tools.protoc \ + -I/app/proto \ + --python_out=/app/shared \ + --grpc_python_out=/app/shared \ + /app/proto/metrics.proto + COPY services/alerts /app/services/alerts ENV PYTHONPATH=/app diff --git a/services/alerts/__init__.py b/services/alerts/__init__.py new file mode 100644 index 0000000..b634a21 --- /dev/null +++ b/services/alerts/__init__.py @@ -0,0 +1 @@ +"""Alerts service.""" diff --git a/services/alerts/main.py b/services/alerts/main.py new file mode 100644 index 0000000..e6c897b --- /dev/null +++ b/services/alerts/main.py @@ -0,0 +1,317 @@ +"""Alerts service - subscribes to metrics events and evaluates thresholds.""" + +import asyncio +import signal +import sys +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + +import asyncpg + +# Add project root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from shared.config import get_alerts_config +from shared.events import get_publisher, get_subscriber +from shared.logging import setup_logging + + +@dataclass +class AlertRule: + """An alert rule configuration.""" + + id: int + name: str + metric_type: str + operator: str # gt, lt, gte, lte, eq + threshold: float + severity: str # warning, critical + enabled: bool + + +@dataclass +class Alert: + """A triggered alert.""" + + rule: AlertRule + machine_id: str + value: float + triggered_at: datetime + + +class AlertEvaluator: + """Evaluates metrics against alert rules.""" + + OPERATORS = { + "gt": lambda v, t: v > t, + "lt": lambda v, t: v < t, + "gte": lambda v, t: v >= t, + "lte": lambda v, t: v <= t, + "eq": lambda v, t: v == t, + } + + def __init__(self, rules: list[AlertRule]): + self.rules = {r.metric_type: r for r in rules if r.enabled} + # Track active alerts to avoid duplicates + self.active_alerts: dict[str, Alert] = {} # key: f"{machine_id}:{rule_name}" + + def evaluate(self, machine_id: str, metrics: dict[str, float]) -> list[Alert]: + """Evaluate metrics against rules and return new alerts.""" + new_alerts = [] + + for metric_type, value in metrics.items(): + rule = self.rules.get(metric_type) + if not rule: + continue + + op_func = self.OPERATORS.get(rule.operator) + if not op_func: + continue + + alert_key = f"{machine_id}:{rule.name}" + + if op_func(value, rule.threshold): + # Threshold exceeded + if alert_key not in self.active_alerts: + alert = Alert( + rule=rule, + machine_id=machine_id, + value=value, + triggered_at=datetime.utcnow(), + ) + self.active_alerts[alert_key] = alert + new_alerts.append(alert) + else: + # Threshold no longer exceeded - resolve alert + if alert_key in self.active_alerts: + del self.active_alerts[alert_key] + + return new_alerts + + def update_rules(self, rules: list[AlertRule]) -> None: + """Update the rules being evaluated.""" + self.rules = {r.metric_type: r for r in rules if r.enabled} + + +class AlertsService: + """Main alerts service.""" + + def __init__(self): + self.config = get_alerts_config() + self.logger = setup_logging( + service_name=self.config.service_name, + log_level=self.config.log_level, + log_format=self.config.log_format, + ) + + self.running = False + self.db_pool: asyncpg.Pool | None = None + self.evaluator: AlertEvaluator | None = None + self.subscriber = get_subscriber(topics=["metrics.raw"]) + self.publisher = get_publisher(source="alerts") + + async def connect_db(self) -> None: + """Connect to TimescaleDB for rules and alert storage.""" + try: + self.db_pool = await asyncpg.create_pool( + self.config.timescale_url, + min_size=1, + max_size=5, + ) + self.logger.info("database_connected") + except Exception as e: + self.logger.warning("database_connection_failed", error=str(e)) + self.db_pool = None + + async def load_rules(self) -> list[AlertRule]: + """Load alert rules from database.""" + if not self.db_pool: + # Return default rules if no database + return [ + AlertRule( + 1, "High CPU Usage", "CPU_PERCENT", "gt", 80.0, "warning", True + ), + AlertRule( + 2, "Critical CPU Usage", "CPU_PERCENT", "gt", 95.0, "critical", True + ), + AlertRule( + 3, + "High Memory Usage", + "MEMORY_PERCENT", + "gt", + 85.0, + "warning", + True, + ), + AlertRule( + 4, + "Critical Memory Usage", + "MEMORY_PERCENT", + "gt", + 95.0, + "critical", + True, + ), + AlertRule( + 5, "High Disk Usage", "DISK_PERCENT", "gt", 80.0, "warning", True + ), + AlertRule( + 6, + "Critical Disk Usage", + "DISK_PERCENT", + "gt", + 90.0, + "critical", + True, + ), + ] + + async with self.db_pool.acquire() as conn: + rows = await conn.fetch( + "SELECT id, name, metric_type, operator, threshold, severity, enabled FROM alert_rules" + ) + + return [ + AlertRule( + id=row["id"], + name=row["name"], + metric_type=row["metric_type"], + operator=row["operator"], + threshold=row["threshold"], + severity=row["severity"], + enabled=row["enabled"], + ) + for row in rows + ] + + async def store_alert(self, alert: Alert) -> None: + """Store triggered alert in database.""" + if not self.db_pool: + return + + try: + async with self.db_pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO alerts (time, machine_id, rule_id, rule_name, metric_type, value, threshold, severity) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + """, + alert.triggered_at, + alert.machine_id, + alert.rule.id, + alert.rule.name, + alert.rule.metric_type, + alert.value, + alert.rule.threshold, + alert.rule.severity, + ) + except Exception as e: + self.logger.warning("alert_storage_failed", error=str(e)) + + async def publish_alert(self, alert: Alert) -> None: + """Publish alert event for other services (e.g., notifications).""" + await self.publisher.publish( + topic=f"alerts.{alert.rule.severity}", + payload={ + "rule_name": alert.rule.name, + "machine_id": alert.machine_id, + "metric_type": alert.rule.metric_type, + "value": alert.value, + "threshold": alert.rule.threshold, + "severity": alert.rule.severity, + "triggered_at": alert.triggered_at.isoformat(), + }, + ) + + async def process_metrics(self, event_data: dict[str, Any]) -> None: + """Process incoming metrics and evaluate alerts.""" + if not self.evaluator: + return + + machine_id = event_data.get("machine_id", "unknown") + metrics = event_data.get("metrics", {}) + + alerts = self.evaluator.evaluate(machine_id, metrics) + + for alert in alerts: + self.logger.warning( + "alert_triggered", + rule=alert.rule.name, + machine_id=alert.machine_id, + value=alert.value, + threshold=alert.rule.threshold, + severity=alert.rule.severity, + ) + + await self.store_alert(alert) + await self.publish_alert(alert) + + async def run(self) -> None: + """Main service loop.""" + self.running = True + + self.logger.info("alerts_service_starting") + + # Connect to database + await self.connect_db() + + # Load rules + rules = await self.load_rules() + self.evaluator = AlertEvaluator(rules) + self.logger.info("rules_loaded", count=len(rules)) + + # Connect to event bus + await self.subscriber.connect() + await self.publisher.connect() + + self.logger.info("alerts_service_started") + + try: + # Process events + async for event in self.subscriber.consume(): + if not self.running: + break + + try: + await self.process_metrics(event.payload) + except Exception as e: + self.logger.error("event_processing_error", error=str(e)) + + except asyncio.CancelledError: + self.logger.info("alerts_service_cancelled") + + finally: + await self.subscriber.disconnect() + await self.publisher.disconnect() + + if self.db_pool: + await self.db_pool.close() + + self.logger.info("alerts_service_stopped") + + def stop(self) -> None: + """Signal the service to stop.""" + self.running = False + + +async def main(): + """Main entry point.""" + service = AlertsService() + + # Handle shutdown signals + loop = asyncio.get_event_loop() + + def signal_handler(): + service.logger.info("shutdown_signal_received") + service.stop() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + await service.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/alerts/requirements.txt b/services/alerts/requirements.txt index dc6d7d7..b80e87d 100644 --- a/services/alerts/requirements.txt +++ b/services/alerts/requirements.txt @@ -1,3 +1,5 @@ +grpcio>=1.60.0 +grpcio-tools>=1.60.0 redis>=5.0.0 asyncpg>=0.29.0 structlog>=23.2.0 diff --git a/services/collector/__init__.py b/services/collector/__init__.py new file mode 100644 index 0000000..4bb09d2 --- /dev/null +++ b/services/collector/__init__.py @@ -0,0 +1 @@ +"""Collector service.""" diff --git a/services/collector/main.py b/services/collector/main.py new file mode 100644 index 0000000..3f4dc52 --- /dev/null +++ b/services/collector/main.py @@ -0,0 +1,209 @@ +"""Collector service - streams system metrics to the aggregator via gRPC.""" + +import asyncio +import signal +import sys +from pathlib import Path + +import grpc + +# Add project root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from services.collector.metrics import MetricsCollector +from shared import metrics_pb2, metrics_pb2_grpc +from shared.config import get_collector_config +from shared.logging import setup_logging + + +class CollectorService: + """Main collector service that streams metrics to the aggregator.""" + + def __init__(self): + self.config = get_collector_config() + self.logger = setup_logging( + service_name=self.config.service_name, + log_level=self.config.log_level, + log_format=self.config.log_format, + ) + self.running = False + self.channel: grpc.aio.Channel | None = None + self.stub: metrics_pb2_grpc.MetricsServiceStub | None = None + + self.collector = MetricsCollector( + machine_id=self.config.machine_id, + collect_cpu=self.config.collect_cpu, + collect_memory=self.config.collect_memory, + collect_disk=self.config.collect_disk, + collect_network=self.config.collect_network, + collect_load=self.config.collect_load, + ) + + async def connect(self) -> None: + """Establish connection to the aggregator.""" + self.logger.info( + "connecting_to_aggregator", + aggregator_url=self.config.aggregator_url, + ) + + self.channel = grpc.aio.insecure_channel( + self.config.aggregator_url, + options=[ + ("grpc.keepalive_time_ms", 10000), + ("grpc.keepalive_timeout_ms", 5000), + ("grpc.keepalive_permit_without_calls", True), + ], + ) + self.stub = metrics_pb2_grpc.MetricsServiceStub(self.channel) + + # Wait for channel to be ready + try: + await asyncio.wait_for( + self.channel.channel_ready(), + timeout=10.0, + ) + self.logger.info("connected_to_aggregator") + except asyncio.TimeoutError: + self.logger.error("connection_timeout") + raise + + async def disconnect(self) -> None: + """Close connection to the aggregator.""" + if self.channel: + await self.channel.close() + self.channel = None + self.stub = None + self.logger.info("disconnected_from_aggregator") + + def _batch_to_proto(self, batch) -> list[metrics_pb2.Metric]: + """Convert a MetricsBatch to protobuf messages.""" + protos = [] + for metric in batch.metrics: + proto = metrics_pb2.Metric( + machine_id=batch.machine_id, + hostname=batch.hostname, + timestamp_ms=batch.timestamp_ms, + type=getattr(metrics_pb2, metric.metric_type, 0), + value=metric.value, + labels=metric.labels, + ) + protos.append(proto) + return protos + + async def _metric_generator(self): + """Async generator that yields metrics at the configured interval.""" + while self.running: + batch = self.collector.collect() + protos = self._batch_to_proto(batch) + + for proto in protos: + yield proto + + self.logger.debug( + "collected_metrics", + count=len(protos), + machine_id=batch.machine_id, + ) + + await asyncio.sleep(self.config.collection_interval) + + async def stream_metrics(self) -> None: + """Stream metrics to the aggregator.""" + if not self.stub: + raise RuntimeError("Not connected to aggregator") + + retry_count = 0 + max_retries = 10 + base_delay = 1.0 + + while self.running: + try: + self.logger.info("starting_metric_stream") + + response = await self.stub.StreamMetrics(self._metric_generator()) + + self.logger.info( + "stream_completed", + success=response.success, + metrics_received=response.metrics_received, + message=response.message, + ) + + retry_count = 0 + + except grpc.aio.AioRpcError as e: + retry_count += 1 + delay = min(base_delay * (2**retry_count), 60.0) + + self.logger.warning( + "stream_error", + code=e.code().name, + details=e.details(), + retry_count=retry_count, + retry_delay=delay, + ) + + if retry_count >= max_retries: + self.logger.error("max_retries_exceeded") + raise + + await asyncio.sleep(delay) + + # Reconnect + try: + await self.disconnect() + await self.connect() + except Exception as conn_err: + self.logger.error("reconnect_failed", error=str(conn_err)) + + except asyncio.CancelledError: + self.logger.info("stream_cancelled") + break + + async def run(self) -> None: + """Main entry point for the collector service.""" + self.running = True + + self.logger.info( + "collector_starting", + machine_id=self.config.machine_id, + interval=self.config.collection_interval, + ) + + # Initial CPU percent call to initialize (first call always returns 0) + import psutil + + psutil.cpu_percent() + + await self.connect() + + try: + await self.stream_metrics() + finally: + await self.disconnect() + self.logger.info("collector_stopped") + + def stop(self) -> None: + """Signal the collector to stop.""" + self.running = False + + +async def main(): + """Main entry point.""" + service = CollectorService() + + # Handle shutdown signals + loop = asyncio.get_event_loop() + + def signal_handler(): + service.logger.info("shutdown_signal_received") + service.stop() + + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, signal_handler) + + await service.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/collector/metrics.py b/services/collector/metrics.py new file mode 100644 index 0000000..e1d3cdc --- /dev/null +++ b/services/collector/metrics.py @@ -0,0 +1,233 @@ +"""System metrics collection using psutil.""" + +import socket +import time +from dataclasses import dataclass, field + +import psutil + + +@dataclass +class MetricPoint: + """A single metric data point.""" + + metric_type: str + value: float + labels: dict[str, str] = field(default_factory=dict) + + +@dataclass +class MetricsBatch: + """A batch of metrics from a single collection cycle.""" + + machine_id: str + hostname: str + timestamp_ms: int + metrics: list[MetricPoint] + + +class MetricsCollector: + """Collects system metrics using psutil.""" + + def __init__( + self, + machine_id: str, + collect_cpu: bool = True, + collect_memory: bool = True, + collect_disk: bool = True, + collect_network: bool = True, + collect_load: bool = True, + ): + self.machine_id = machine_id + self.hostname = socket.gethostname() + + self.collect_cpu = collect_cpu + self.collect_memory = collect_memory + self.collect_disk = collect_disk + self.collect_network = collect_network + self.collect_load = collect_load + + # Track previous network counters for rate calculation + self._prev_net_io: psutil._common.snetio | None = None + self._prev_net_time: float | None = None + + def collect(self) -> MetricsBatch: + """Collect all enabled metrics and return as a batch.""" + metrics: list[MetricPoint] = [] + + if self.collect_cpu: + metrics.extend(self._collect_cpu()) + + if self.collect_memory: + metrics.extend(self._collect_memory()) + + if self.collect_disk: + metrics.extend(self._collect_disk()) + + if self.collect_network: + metrics.extend(self._collect_network()) + + if self.collect_load: + metrics.extend(self._collect_load()) + + return MetricsBatch( + machine_id=self.machine_id, + hostname=self.hostname, + timestamp_ms=int(time.time() * 1000), + metrics=metrics, + ) + + def _collect_cpu(self) -> list[MetricPoint]: + """Collect CPU metrics.""" + metrics = [] + + # Overall CPU percent + cpu_percent = psutil.cpu_percent(interval=None) + metrics.append( + MetricPoint( + metric_type="CPU_PERCENT", + value=cpu_percent, + ) + ) + + # Per-core CPU percent + per_cpu = psutil.cpu_percent(interval=None, percpu=True) + for i, pct in enumerate(per_cpu): + metrics.append( + MetricPoint( + metric_type="CPU_PERCENT_PER_CORE", + value=pct, + labels={"core": str(i)}, + ) + ) + + return metrics + + def _collect_memory(self) -> list[MetricPoint]: + """Collect memory metrics.""" + mem = psutil.virtual_memory() + + return [ + MetricPoint(metric_type="MEMORY_PERCENT", value=mem.percent), + MetricPoint(metric_type="MEMORY_USED_BYTES", value=float(mem.used)), + MetricPoint( + metric_type="MEMORY_AVAILABLE_BYTES", value=float(mem.available) + ), + ] + + def _collect_disk(self) -> list[MetricPoint]: + """Collect disk metrics.""" + metrics = [] + + # Disk usage for root partition + try: + disk = psutil.disk_usage("/") + metrics.append( + MetricPoint( + metric_type="DISK_PERCENT", + value=disk.percent, + labels={"mount": "/"}, + ) + ) + metrics.append( + MetricPoint( + metric_type="DISK_USED_BYTES", + value=float(disk.used), + labels={"mount": "/"}, + ) + ) + except (PermissionError, FileNotFoundError): + pass + + # Disk I/O rates + try: + io = psutil.disk_io_counters() + if io: + metrics.append( + MetricPoint( + metric_type="DISK_READ_BYTES_SEC", + value=float( + io.read_bytes + ), # Will be converted to rate by aggregator + ) + ) + metrics.append( + MetricPoint( + metric_type="DISK_WRITE_BYTES_SEC", + value=float(io.write_bytes), + ) + ) + except (PermissionError, AttributeError): + pass + + return metrics + + def _collect_network(self) -> list[MetricPoint]: + """Collect network metrics with rate calculation.""" + metrics = [] + + try: + net_io = psutil.net_io_counters() + current_time = time.time() + + if self._prev_net_io is not None and self._prev_net_time is not None: + time_delta = current_time - self._prev_net_time + if time_delta > 0: + bytes_sent_rate = ( + net_io.bytes_sent - self._prev_net_io.bytes_sent + ) / time_delta + bytes_recv_rate = ( + net_io.bytes_recv - self._prev_net_io.bytes_recv + ) / time_delta + + metrics.append( + MetricPoint( + metric_type="NETWORK_SENT_BYTES_SEC", + value=bytes_sent_rate, + ) + ) + metrics.append( + MetricPoint( + metric_type="NETWORK_RECV_BYTES_SEC", + value=bytes_recv_rate, + ) + ) + + self._prev_net_io = net_io + self._prev_net_time = current_time + + # Connection count + connections = len(psutil.net_connections(kind="inet")) + metrics.append( + MetricPoint( + metric_type="NETWORK_CONNECTIONS", + value=float(connections), + ) + ) + except (PermissionError, psutil.AccessDenied): + pass + + return metrics + + def _collect_load(self) -> list[MetricPoint]: + """Collect load average metrics (Unix only).""" + metrics = [] + + try: + load1, load5, load15 = psutil.getloadavg() + metrics.append(MetricPoint(metric_type="LOAD_AVG_1M", value=load1)) + metrics.append(MetricPoint(metric_type="LOAD_AVG_5M", value=load5)) + metrics.append(MetricPoint(metric_type="LOAD_AVG_15M", value=load15)) + except (AttributeError, OSError): + # Windows doesn't have getloadavg + pass + + # Process count + metrics.append( + MetricPoint( + metric_type="PROCESS_COUNT", + value=float(len(psutil.pids())), + ) + ) + + return metrics diff --git a/services/gateway/Dockerfile b/services/gateway/Dockerfile index ad54609..7e91329 100644 --- a/services/gateway/Dockerfile +++ b/services/gateway/Dockerfile @@ -21,6 +21,8 @@ RUN python -m grpc_tools.protoc \ /app/proto/metrics.proto COPY services/gateway /app/services/gateway +COPY services/aggregator/__init__.py /app/services/aggregator/__init__.py +COPY services/aggregator/storage.py /app/services/aggregator/storage.py COPY web /app/web ENV PYTHONPATH=/app diff --git a/services/gateway/__init__.py b/services/gateway/__init__.py new file mode 100644 index 0000000..55780d3 --- /dev/null +++ b/services/gateway/__init__.py @@ -0,0 +1 @@ +"""Gateway service.""" diff --git a/services/gateway/main.py b/services/gateway/main.py new file mode 100644 index 0000000..0f66c57 --- /dev/null +++ b/services/gateway/main.py @@ -0,0 +1,393 @@ +"""Gateway service - FastAPI with WebSocket for real-time dashboard.""" + +import asyncio +import json +import sys +from contextlib import asynccontextmanager +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any + +import grpc +from fastapi import FastAPI, HTTPException, Query, WebSocket, WebSocketDisconnect +from fastapi.requests import Request +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates + +# Add project root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from services.aggregator.storage import TimescaleStorage +from shared import metrics_pb2, metrics_pb2_grpc +from shared.config import get_gateway_config +from shared.events import get_subscriber +from shared.logging import setup_logging + +# Global state +config = get_gateway_config() +logger = setup_logging( + service_name=config.service_name, + log_level=config.log_level, + log_format=config.log_format, +) + + +# WebSocket connection manager +class ConnectionManager: + """Manages WebSocket connections for real-time updates.""" + + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket) -> None: + await websocket.accept() + self.active_connections.append(websocket) + logger.info("websocket_connected", total=len(self.active_connections)) + + def disconnect(self, websocket: WebSocket) -> None: + self.active_connections.remove(websocket) + logger.info("websocket_disconnected", total=len(self.active_connections)) + + async def broadcast(self, message: dict) -> None: + """Broadcast message to all connected clients.""" + if not self.active_connections: + return + + data = json.dumps(message) + disconnected = [] + + for connection in self.active_connections: + try: + await connection.send_text(data) + except Exception: + disconnected.append(connection) + + # Clean up disconnected + for conn in disconnected: + try: + self.active_connections.remove(conn) + except ValueError: + pass + + +manager = ConnectionManager() +timescale: TimescaleStorage | None = None +grpc_channel: grpc.aio.Channel | None = None +grpc_stub: metrics_pb2_grpc.MetricsServiceStub | None = None + + +async def event_listener(): + """Background task that listens for metric events and broadcasts to WebSocket clients.""" + logger.info("event_listener_starting") + + async with get_subscriber(topics=["metrics.raw"]) as subscriber: + async for event in subscriber.consume(): + try: + await manager.broadcast( + { + "type": "metrics", + "data": event.payload, + "timestamp": event.timestamp.isoformat(), + } + ) + except Exception as e: + logger.warning("broadcast_error", error=str(e)) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan manager.""" + global timescale, grpc_channel, grpc_stub + + logger.info("gateway_starting", port=config.http_port) + + # Connect to TimescaleDB for historical queries + timescale = TimescaleStorage(config.timescale_url) + try: + await timescale.connect() + except Exception as e: + logger.warning("timescale_connection_failed", error=str(e)) + timescale = None + + # Connect to aggregator via gRPC + grpc_channel = grpc.aio.insecure_channel(config.aggregator_url) + grpc_stub = metrics_pb2_grpc.MetricsServiceStub(grpc_channel) + + # Start event listener in background + listener_task = asyncio.create_task(event_listener()) + + logger.info("gateway_started") + + yield + + # Cleanup + listener_task.cancel() + try: + await listener_task + except asyncio.CancelledError: + pass + + if grpc_channel: + await grpc_channel.close() + + if timescale: + await timescale.disconnect() + + logger.info("gateway_stopped") + + +# Create FastAPI app +app = FastAPI( + title="System Monitor Gateway", + description="Real-time system monitoring dashboard", + version="0.1.0", + lifespan=lifespan, +) + +# Mount static files +static_path = Path(__file__).parent.parent.parent / "web" / "static" +if static_path.exists(): + app.mount("/static", StaticFiles(directory=str(static_path)), name="static") + +# Templates +templates_path = Path(__file__).parent.parent.parent / "web" / "templates" +templates = ( + Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None +) + + +# ============================================================================ +# Health endpoints +# ============================================================================ + + +@app.get("/health") +async def health_check(): + """Health check endpoint.""" + return {"status": "healthy", "service": "gateway"} + + +@app.get("/ready") +async def readiness_check(): + """Readiness check - verifies dependencies.""" + checks = {"gateway": "ok"} + + # Check gRPC connection + try: + if grpc_stub: + await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=2.0) + checks["aggregator"] = "ok" + except Exception as e: + checks["aggregator"] = f"error: {str(e)}" + + # Check TimescaleDB + if timescale and timescale._pool: + checks["timescaledb"] = "ok" + else: + checks["timescaledb"] = "not connected" + + return {"status": "ready", "checks": checks} + + +# ============================================================================ +# REST API endpoints +# ============================================================================ + + +@app.get("/api/machines") +async def get_machines(): + """Get current state of all machines.""" + if not grpc_stub: + raise HTTPException(status_code=503, detail="Aggregator not connected") + + try: + response = await grpc_stub.GetAllStates(metrics_pb2.Empty(), timeout=5.0) + + machines = [] + for state in response.machines: + metrics = {} + for m in state.current_metrics: + metric_type = metrics_pb2.MetricType.Name(m.type) + metrics[metric_type] = m.value + + machines.append( + { + "machine_id": state.machine_id, + "hostname": state.hostname, + "last_seen_ms": state.last_seen_ms, + "health": metrics_pb2.HealthStatus.Name(state.health), + "metrics": metrics, + } + ) + + return {"machines": machines} + + except grpc.aio.AioRpcError as e: + raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") + + +@app.get("/api/machines/{machine_id}") +async def get_machine(machine_id: str): + """Get current state of a specific machine.""" + if not grpc_stub: + raise HTTPException(status_code=503, detail="Aggregator not connected") + + try: + response = await grpc_stub.GetCurrentState( + metrics_pb2.StateRequest(machine_id=machine_id), + timeout=5.0, + ) + + if not response.machine_id: + raise HTTPException(status_code=404, detail="Machine not found") + + metrics = {} + for m in response.current_metrics: + metric_type = metrics_pb2.MetricType.Name(m.type) + metrics[metric_type] = m.value + + return { + "machine_id": response.machine_id, + "hostname": response.hostname, + "last_seen_ms": response.last_seen_ms, + "health": metrics_pb2.HealthStatus.Name(response.health), + "metrics": metrics, + } + + except grpc.aio.AioRpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + raise HTTPException(status_code=404, detail="Machine not found") + raise HTTPException(status_code=503, detail=f"Aggregator error: {e.details()}") + + +@app.get("/api/metrics") +async def get_metrics( + machine_id: str | None = Query(None), + metric_type: str | None = Query(None), + minutes: int = Query(60, ge=1, le=1440), + limit: int = Query(1000, ge=1, le=10000), +): + """Get historical metrics.""" + if not timescale: + raise HTTPException(status_code=503, detail="TimescaleDB not connected") + + end_time = datetime.utcnow() + start_time = end_time - timedelta(minutes=minutes) + + try: + metrics = await timescale.get_metrics( + machine_id=machine_id, + metric_type=metric_type, + start_time=start_time, + end_time=end_time, + limit=limit, + ) + + return {"metrics": metrics, "count": len(metrics)} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================ +# WebSocket endpoint +# ============================================================================ + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for real-time metric updates.""" + await manager.connect(websocket) + + try: + # Send initial state + if grpc_stub: + try: + response = await grpc_stub.GetAllStates( + metrics_pb2.Empty(), timeout=5.0 + ) + + for state in response.machines: + metrics = {} + for m in state.current_metrics: + metric_type = metrics_pb2.MetricType.Name(m.type) + metrics[metric_type] = m.value + + await websocket.send_json( + { + "type": "initial", + "data": { + "machine_id": state.machine_id, + "hostname": state.hostname, + "metrics": metrics, + }, + } + ) + except Exception as e: + logger.warning("initial_state_error", error=str(e)) + + # Keep connection alive and handle incoming messages + while True: + try: + data = await websocket.receive_text() + # Handle ping/pong or commands from client + if data == "ping": + await websocket.send_text("pong") + except WebSocketDisconnect: + break + + finally: + manager.disconnect(websocket) + + +# ============================================================================ +# Dashboard (HTML) +# ============================================================================ + + +@app.get("/", response_class=HTMLResponse) +async def dashboard(request: Request): + """Serve the dashboard HTML.""" + if templates: + return templates.TemplateResponse("dashboard.html", {"request": request}) + + # Fallback if templates not found + return HTMLResponse(""" + + +
+Dashboard template not found. API endpoints:
+Connecting...+ + + + """) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=config.http_port) diff --git a/shared/__init__.py b/shared/__init__.py new file mode 100644 index 0000000..f1222c4 --- /dev/null +++ b/shared/__init__.py @@ -0,0 +1,5 @@ +"""Shared utilities and generated protobuf modules.""" + +from . import metrics_pb2, metrics_pb2_grpc + +__all__ = ["metrics_pb2", "metrics_pb2_grpc"] diff --git a/shared/config.py b/shared/config.py new file mode 100644 index 0000000..203b752 --- /dev/null +++ b/shared/config.py @@ -0,0 +1,104 @@ +"""Shared configuration management using Pydantic Settings.""" + +from functools import lru_cache + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class BaseConfig(BaseSettings): + """Base configuration shared across all services.""" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + ) + + # Service identification + service_name: str = "unknown" + machine_id: str = "local" + + # Logging + log_level: str = "INFO" + log_format: str = "json" # "json" or "console" + + # Redis + redis_url: str = "redis://localhost:6379" + + # Events + events_backend: str = "redis_pubsub" + + +class CollectorConfig(BaseConfig): + """Collector service configuration.""" + + service_name: str = "collector" + + # Aggregator connection + aggregator_url: str = "localhost:50051" + + # Collection settings + collection_interval: int = 5 # seconds + + # Metrics to collect + collect_cpu: bool = True + collect_memory: bool = True + collect_disk: bool = True + collect_network: bool = True + collect_load: bool = True + + +class AggregatorConfig(BaseConfig): + """Aggregator service configuration.""" + + service_name: str = "aggregator" + + # gRPC server + grpc_port: int = 50051 + + # TimescaleDB - can be set directly via TIMESCALE_URL + timescale_url: str = "postgresql://monitor:monitor@localhost:5432/monitor" + + +class GatewayConfig(BaseConfig): + """Gateway service configuration.""" + + service_name: str = "gateway" + + # HTTP server + http_port: int = 8000 + + # Aggregator connection + aggregator_url: str = "localhost:50051" + + # TimescaleDB - can be set directly via TIMESCALE_URL + timescale_url: str = "postgresql://monitor:monitor@localhost:5432/monitor" + + +class AlertsConfig(BaseConfig): + """Alerts service configuration.""" + + service_name: str = "alerts" + + # TimescaleDB - can be set directly via TIMESCALE_URL or built from components + timescale_url: str = "postgresql://monitor:monitor@localhost:5432/monitor" + + +@lru_cache +def get_collector_config() -> CollectorConfig: + return CollectorConfig() + + +@lru_cache +def get_aggregator_config() -> AggregatorConfig: + return AggregatorConfig() + + +@lru_cache +def get_gateway_config() -> GatewayConfig: + return GatewayConfig() + + +@lru_cache +def get_alerts_config() -> AlertsConfig: + return AlertsConfig() diff --git a/shared/logging.py b/shared/logging.py new file mode 100644 index 0000000..7595bef --- /dev/null +++ b/shared/logging.py @@ -0,0 +1,74 @@ +"""Structured logging configuration.""" + +import logging +import sys +from typing import Any + +import structlog + + +def setup_logging( + service_name: str, + log_level: str = "INFO", + log_format: str = "json", +) -> structlog.BoundLogger: + """ + Configure structured logging for a service. + + Args: + service_name: Name of the service for log context + log_level: Logging level (DEBUG, INFO, WARNING, ERROR) + log_format: Output format ("json" or "console") + + Returns: + Configured structlog logger + """ + + # Shared processors + shared_processors: list[Any] = [ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + ] + + if log_format == "json": + # JSON format for production + processors = shared_processors + [ + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(), + ] + else: + # Console format for development + processors = shared_processors + [ + structlog.dev.ConsoleRenderer(colors=True), + ] + + structlog.configure( + processors=processors, + wrapper_class=structlog.make_filtering_bound_logger( + getattr(logging, log_level.upper(), logging.INFO) + ), + context_class=dict, + logger_factory=structlog.PrintLoggerFactory(), + cache_logger_on_first_use=True, + ) + + # Also configure standard library logging + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=getattr(logging, log_level.upper(), logging.INFO), + ) + + # Get logger with service context + logger = structlog.get_logger(service=service_name) + + return logger + + +def get_logger(name: str | None = None) -> structlog.BoundLogger: + """Get a logger instance, optionally with a specific name.""" + if name: + return structlog.get_logger(component=name) + return structlog.get_logger() diff --git a/shared/metrics_pb2.py b/shared/metrics_pb2.py new file mode 100644 index 0000000..f4e5cdc --- /dev/null +++ b/shared/metrics_pb2.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: metrics.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'metrics.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmetrics.proto\x12\nmonitoring\"\x07\n\x05\x45mpty\"\xd8\x01\n\x06Metric\x12\x12\n\nmachine_id\x18\x01 \x01(\t\x12\x10\n\x08hostname\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\x12$\n\x04type\x18\x04 \x01(\x0e\x32\x16.monitoring.MetricType\x12\r\n\x05value\x18\x05 \x01(\x01\x12.\n\x06labels\x18\x06 \x03(\x0b\x32\x1e.monitoring.Metric.LabelsEntry\x1a-\n\x0bLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"s\n\x0bMetricBatch\x12\x12\n\nmachine_id\x18\x01 \x01(\t\x12\x10\n\x08hostname\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\x12(\n\x07metrics\x18\x04 \x03(\x0b\x32\x17.monitoring.MetricPoint\"\xa6\x01\n\x0bMetricPoint\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.monitoring.MetricType\x12\r\n\x05value\x18\x02 \x01(\x01\x12\x33\n\x06labels\x18\x03 \x03(\x0b\x32#.monitoring.MetricPoint.LabelsEntry\x1a-\n\x0bLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"G\n\tStreamAck\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x18\n\x10metrics_received\x18\x02 \x01(\x03\x12\x0f\n\x07message\x18\x03 \x01(\t\"\"\n\x0cStateRequest\x12\x12\n\nmachine_id\x18\x01 \x01(\t\"\x8c\x02\n\x0cMachineState\x12\x12\n\nmachine_id\x18\x01 \x01(\t\x12\x10\n\x08hostname\x18\x02 \x01(\t\x12\x14\n\x0clast_seen_ms\x18\x03 \x01(\x03\x12+\n\x0f\x63urrent_metrics\x18\x04 \x03(\x0b\x32\x12.monitoring.Metric\x12(\n\x06health\x18\x05 \x01(\x0e\x32\x18.monitoring.HealthStatus\x12\x38\n\x08metadata\x18\x06 \x03(\x0b\x32&.monitoring.MachineState.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\">\n\x10\x41llMachinesState\x12*\n\x08machines\x18\x01 \x03(\x0b\x32\x18.monitoring.MachineState\"\xd7\x01\n\x0e\x43ontrolCommand\x12\x12\n\ncommand_id\x18\x01 \x01(\t\x12<\n\x0fupdate_interval\x18\x02 \x01(\x0b\x32!.monitoring.UpdateIntervalCommandH\x00\x12\x37\n\x07restart\x18\x03 \x01(\x0b\x32$.monitoring.RestartCollectionCommandH\x00\x12/\n\x08shutdown\x18\x04 \x01(\x0b\x32\x1b.monitoring.ShutdownCommandH\x00\x42\t\n\x07\x63ommand\"1\n\x15UpdateIntervalCommand\x12\x18\n\x10interval_seconds\x18\x01 \x01(\x05\"\x1a\n\x18RestartCollectionCommand\"#\n\x0fShutdownCommand\x12\x10\n\x08graceful\x18\x01 \x01(\x08\"G\n\x0f\x43ontrolResponse\x12\x12\n\ncommand_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"#\n\rConfigRequest\x12\x12\n\nmachine_id\x18\x01 \x01(\t\"\x80\x02\n\x0f\x43ollectorConfig\x12#\n\x1b\x63ollection_interval_seconds\x18\x01 \x01(\x05\x12/\n\x0f\x65nabled_metrics\x18\x02 \x03(\x0e\x32\x16.monitoring.MetricType\x12\x37\n\x06labels\x18\x03 \x03(\x0b\x32\'.monitoring.CollectorConfig.LabelsEntry\x12/\n\nthresholds\x18\x04 \x03(\x0b\x32\x1b.monitoring.ThresholdConfig\x1a-\n\x0bLabelsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"u\n\x0fThresholdConfig\x12+\n\x0bmetric_type\x18\x01 \x01(\x0e\x32\x16.monitoring.MetricType\x12\x19\n\x11warning_threshold\x18\x02 \x01(\x01\x12\x1a\n\x12\x63ritical_threshold\x18\x03 \x01(\x01*\x8d\x03\n\nMetricType\x12\x1b\n\x17METRIC_TYPE_UNSPECIFIED\x10\x00\x12\x0f\n\x0b\x43PU_PERCENT\x10\x01\x12\x18\n\x14\x43PU_PERCENT_PER_CORE\x10\x02\x12\x12\n\x0eMEMORY_PERCENT\x10\x03\x12\x15\n\x11MEMORY_USED_BYTES\x10\x04\x12\x1a\n\x16MEMORY_AVAILABLE_BYTES\x10\x05\x12\x10\n\x0c\x44ISK_PERCENT\x10\x06\x12\x13\n\x0f\x44ISK_USED_BYTES\x10\x07\x12\x17\n\x13\x44ISK_READ_BYTES_SEC\x10\x08\x12\x18\n\x14\x44ISK_WRITE_BYTES_SEC\x10\t\x12\x1a\n\x16NETWORK_SENT_BYTES_SEC\x10\n\x12\x1a\n\x16NETWORK_RECV_BYTES_SEC\x10\x0b\x12\x17\n\x13NETWORK_CONNECTIONS\x10\x0c\x12\x11\n\rPROCESS_COUNT\x10\r\x12\x0f\n\x0bLOAD_AVG_1M\x10\x0e\x12\x0f\n\x0bLOAD_AVG_5M\x10\x0f\x12\x10\n\x0cLOAD_AVG_15M\x10\x10*o\n\x0cHealthStatus\x12\x1d\n\x19HEALTH_STATUS_UNSPECIFIED\x10\x00\x12\x0b\n\x07HEALTHY\x10\x01\x12\x0b\n\x07WARNING\x10\x02\x12\x0c\n\x08\x43RITICAL\x10\x03\x12\x0b\n\x07UNKNOWN\x10\x04\x12\x0b\n\x07OFFLINE\x10\x05\x32\xdc\x01\n\x0eMetricsService\x12>\n\rStreamMetrics\x12\x12.monitoring.Metric\x1a\x15.monitoring.StreamAck\"\x00(\x01\x12G\n\x0fGetCurrentState\x12\x18.monitoring.StateRequest\x1a\x18.monitoring.MachineState\"\x00\x12\x41\n\x0cGetAllStates\x12\x11.monitoring.Empty\x1a\x1c.monitoring.AllMachinesState\"\x00\x32Z\n\x0e\x43ontrolService\x12H\n\x07\x43ontrol\x12\x1a.monitoring.ControlCommand\x1a\x1b.monitoring.ControlResponse\"\x00(\x01\x30\x01\x32\xa1\x01\n\rConfigService\x12\x45\n\tGetConfig\x12\x19.monitoring.ConfigRequest\x1a\x1b.monitoring.CollectorConfig\"\x00\x12I\n\x0bWatchConfig\x12\x19.monitoring.ConfigRequest\x1a\x1b.monitoring.CollectorConfig\"\x00\x30\x01\x42%Z#github.com/your-org/sysmonstm/protob\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'metrics_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'Z#github.com/your-org/sysmonstm/proto' + _globals['_METRIC_LABELSENTRY']._loaded_options = None + _globals['_METRIC_LABELSENTRY']._serialized_options = b'8\001' + _globals['_METRICPOINT_LABELSENTRY']._loaded_options = None + _globals['_METRICPOINT_LABELSENTRY']._serialized_options = b'8\001' + _globals['_MACHINESTATE_METADATAENTRY']._loaded_options = None + _globals['_MACHINESTATE_METADATAENTRY']._serialized_options = b'8\001' + _globals['_COLLECTORCONFIG_LABELSENTRY']._loaded_options = None + _globals['_COLLECTORCONFIG_LABELSENTRY']._serialized_options = b'8\001' + _globals['_METRICTYPE']._serialized_start=1810 + _globals['_METRICTYPE']._serialized_end=2207 + _globals['_HEALTHSTATUS']._serialized_start=2209 + _globals['_HEALTHSTATUS']._serialized_end=2320 + _globals['_EMPTY']._serialized_start=29 + _globals['_EMPTY']._serialized_end=36 + _globals['_METRIC']._serialized_start=39 + _globals['_METRIC']._serialized_end=255 + _globals['_METRIC_LABELSENTRY']._serialized_start=210 + _globals['_METRIC_LABELSENTRY']._serialized_end=255 + _globals['_METRICBATCH']._serialized_start=257 + _globals['_METRICBATCH']._serialized_end=372 + _globals['_METRICPOINT']._serialized_start=375 + _globals['_METRICPOINT']._serialized_end=541 + _globals['_METRICPOINT_LABELSENTRY']._serialized_start=210 + _globals['_METRICPOINT_LABELSENTRY']._serialized_end=255 + _globals['_STREAMACK']._serialized_start=543 + _globals['_STREAMACK']._serialized_end=614 + _globals['_STATEREQUEST']._serialized_start=616 + _globals['_STATEREQUEST']._serialized_end=650 + _globals['_MACHINESTATE']._serialized_start=653 + _globals['_MACHINESTATE']._serialized_end=921 + _globals['_MACHINESTATE_METADATAENTRY']._serialized_start=874 + _globals['_MACHINESTATE_METADATAENTRY']._serialized_end=921 + _globals['_ALLMACHINESSTATE']._serialized_start=923 + _globals['_ALLMACHINESSTATE']._serialized_end=985 + _globals['_CONTROLCOMMAND']._serialized_start=988 + _globals['_CONTROLCOMMAND']._serialized_end=1203 + _globals['_UPDATEINTERVALCOMMAND']._serialized_start=1205 + _globals['_UPDATEINTERVALCOMMAND']._serialized_end=1254 + _globals['_RESTARTCOLLECTIONCOMMAND']._serialized_start=1256 + _globals['_RESTARTCOLLECTIONCOMMAND']._serialized_end=1282 + _globals['_SHUTDOWNCOMMAND']._serialized_start=1284 + _globals['_SHUTDOWNCOMMAND']._serialized_end=1319 + _globals['_CONTROLRESPONSE']._serialized_start=1321 + _globals['_CONTROLRESPONSE']._serialized_end=1392 + _globals['_CONFIGREQUEST']._serialized_start=1394 + _globals['_CONFIGREQUEST']._serialized_end=1429 + _globals['_COLLECTORCONFIG']._serialized_start=1432 + _globals['_COLLECTORCONFIG']._serialized_end=1688 + _globals['_COLLECTORCONFIG_LABELSENTRY']._serialized_start=210 + _globals['_COLLECTORCONFIG_LABELSENTRY']._serialized_end=255 + _globals['_THRESHOLDCONFIG']._serialized_start=1690 + _globals['_THRESHOLDCONFIG']._serialized_end=1807 + _globals['_METRICSSERVICE']._serialized_start=2323 + _globals['_METRICSSERVICE']._serialized_end=2543 + _globals['_CONTROLSERVICE']._serialized_start=2545 + _globals['_CONTROLSERVICE']._serialized_end=2635 + _globals['_CONFIGSERVICE']._serialized_start=2638 + _globals['_CONFIGSERVICE']._serialized_end=2799 +# @@protoc_insertion_point(module_scope) diff --git a/shared/metrics_pb2_grpc.py b/shared/metrics_pb2_grpc.py new file mode 100644 index 0000000..0d3e990 --- /dev/null +++ b/shared/metrics_pb2_grpc.py @@ -0,0 +1,385 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from shared import metrics_pb2 as metrics__pb2 + +GRPC_GENERATED_VERSION = '1.76.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + ' but the generated code in metrics_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class MetricsServiceStub(object): + """MetricsService handles streaming metrics from collectors to aggregator + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.StreamMetrics = channel.stream_unary( + '/monitoring.MetricsService/StreamMetrics', + request_serializer=metrics__pb2.Metric.SerializeToString, + response_deserializer=metrics__pb2.StreamAck.FromString, + _registered_method=True) + self.GetCurrentState = channel.unary_unary( + '/monitoring.MetricsService/GetCurrentState', + request_serializer=metrics__pb2.StateRequest.SerializeToString, + response_deserializer=metrics__pb2.MachineState.FromString, + _registered_method=True) + self.GetAllStates = channel.unary_unary( + '/monitoring.MetricsService/GetAllStates', + request_serializer=metrics__pb2.Empty.SerializeToString, + response_deserializer=metrics__pb2.AllMachinesState.FromString, + _registered_method=True) + + +class MetricsServiceServicer(object): + """MetricsService handles streaming metrics from collectors to aggregator + """ + + def StreamMetrics(self, request_iterator, context): + """Client-side streaming: collector streams metrics to aggregator + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetCurrentState(self, request, context): + """Get current state of a machine + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetAllStates(self, request, context): + """Get current state of all machines + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_MetricsServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'StreamMetrics': grpc.stream_unary_rpc_method_handler( + servicer.StreamMetrics, + request_deserializer=metrics__pb2.Metric.FromString, + response_serializer=metrics__pb2.StreamAck.SerializeToString, + ), + 'GetCurrentState': grpc.unary_unary_rpc_method_handler( + servicer.GetCurrentState, + request_deserializer=metrics__pb2.StateRequest.FromString, + response_serializer=metrics__pb2.MachineState.SerializeToString, + ), + 'GetAllStates': grpc.unary_unary_rpc_method_handler( + servicer.GetAllStates, + request_deserializer=metrics__pb2.Empty.FromString, + response_serializer=metrics__pb2.AllMachinesState.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'monitoring.MetricsService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('monitoring.MetricsService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class MetricsService(object): + """MetricsService handles streaming metrics from collectors to aggregator + """ + + @staticmethod + def StreamMetrics(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary( + request_iterator, + target, + '/monitoring.MetricsService/StreamMetrics', + metrics__pb2.Metric.SerializeToString, + metrics__pb2.StreamAck.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetCurrentState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/monitoring.MetricsService/GetCurrentState', + metrics__pb2.StateRequest.SerializeToString, + metrics__pb2.MachineState.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def GetAllStates(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/monitoring.MetricsService/GetAllStates', + metrics__pb2.Empty.SerializeToString, + metrics__pb2.AllMachinesState.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + +class ControlServiceStub(object): + """ControlService handles bidirectional control commands + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Control = channel.stream_stream( + '/monitoring.ControlService/Control', + request_serializer=metrics__pb2.ControlCommand.SerializeToString, + response_deserializer=metrics__pb2.ControlResponse.FromString, + _registered_method=True) + + +class ControlServiceServicer(object): + """ControlService handles bidirectional control commands + """ + + def Control(self, request_iterator, context): + """Bidirectional streaming for commands and responses + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ControlServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'Control': grpc.stream_stream_rpc_method_handler( + servicer.Control, + request_deserializer=metrics__pb2.ControlCommand.FromString, + response_serializer=metrics__pb2.ControlResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'monitoring.ControlService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('monitoring.ControlService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ControlService(object): + """ControlService handles bidirectional control commands + """ + + @staticmethod + def Control(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream( + request_iterator, + target, + '/monitoring.ControlService/Control', + metrics__pb2.ControlCommand.SerializeToString, + metrics__pb2.ControlResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + +class ConfigServiceStub(object): + """ConfigService handles dynamic configuration + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetConfig = channel.unary_unary( + '/monitoring.ConfigService/GetConfig', + request_serializer=metrics__pb2.ConfigRequest.SerializeToString, + response_deserializer=metrics__pb2.CollectorConfig.FromString, + _registered_method=True) + self.WatchConfig = channel.unary_stream( + '/monitoring.ConfigService/WatchConfig', + request_serializer=metrics__pb2.ConfigRequest.SerializeToString, + response_deserializer=metrics__pb2.CollectorConfig.FromString, + _registered_method=True) + + +class ConfigServiceServicer(object): + """ConfigService handles dynamic configuration + """ + + def GetConfig(self, request, context): + """Get current configuration for a collector + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def WatchConfig(self, request, context): + """Stream configuration updates + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ConfigServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetConfig': grpc.unary_unary_rpc_method_handler( + servicer.GetConfig, + request_deserializer=metrics__pb2.ConfigRequest.FromString, + response_serializer=metrics__pb2.CollectorConfig.SerializeToString, + ), + 'WatchConfig': grpc.unary_stream_rpc_method_handler( + servicer.WatchConfig, + request_deserializer=metrics__pb2.ConfigRequest.FromString, + response_serializer=metrics__pb2.CollectorConfig.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'monitoring.ConfigService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('monitoring.ConfigService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class ConfigService(object): + """ConfigService handles dynamic configuration + """ + + @staticmethod + def GetConfig(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/monitoring.ConfigService/GetConfig', + metrics__pb2.ConfigRequest.SerializeToString, + metrics__pb2.CollectorConfig.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def WatchConfig(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/monitoring.ConfigService/WatchConfig', + metrics__pb2.ConfigRequest.SerializeToString, + metrics__pb2.CollectorConfig.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/web/static/.gitkeep b/web/static/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/web/templates/dashboard.html b/web/templates/dashboard.html new file mode 100644 index 0000000..397f26c --- /dev/null +++ b/web/templates/dashboard.html @@ -0,0 +1,358 @@ + + + + + +
Waiting for collectors to send metrics...
+