"""Alerts service - subscribes to metrics events and evaluates thresholds.""" import asyncio import signal import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any import asyncpg # Add project root to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from shared.config import get_alerts_config from shared.events import get_publisher, get_subscriber from shared.logging import setup_logging @dataclass class AlertRule: """An alert rule configuration.""" id: int name: str metric_type: str operator: str # gt, lt, gte, lte, eq threshold: float severity: str # warning, critical enabled: bool @dataclass class Alert: """A triggered alert.""" rule: AlertRule machine_id: str value: float triggered_at: datetime class AlertEvaluator: """Evaluates metrics against alert rules.""" OPERATORS = { "gt": lambda v, t: v > t, "lt": lambda v, t: v < t, "gte": lambda v, t: v >= t, "lte": lambda v, t: v <= t, "eq": lambda v, t: v == t, } def __init__(self, rules: list[AlertRule]): self.rules = {r.metric_type: r for r in rules if r.enabled} # Track active alerts to avoid duplicates self.active_alerts: dict[str, Alert] = {} # key: f"{machine_id}:{rule_name}" def evaluate(self, machine_id: str, metrics: dict[str, float]) -> list[Alert]: """Evaluate metrics against rules and return new alerts.""" new_alerts = [] for metric_type, value in metrics.items(): rule = self.rules.get(metric_type) if not rule: continue op_func = self.OPERATORS.get(rule.operator) if not op_func: continue alert_key = f"{machine_id}:{rule.name}" if op_func(value, rule.threshold): # Threshold exceeded if alert_key not in self.active_alerts: alert = Alert( rule=rule, machine_id=machine_id, value=value, triggered_at=datetime.utcnow(), ) self.active_alerts[alert_key] = alert new_alerts.append(alert) else: # Threshold no longer exceeded - resolve alert if alert_key in self.active_alerts: del self.active_alerts[alert_key] return new_alerts def update_rules(self, rules: list[AlertRule]) -> None: """Update the rules being evaluated.""" self.rules = {r.metric_type: r for r in rules if r.enabled} class AlertsService: """Main alerts service.""" def __init__(self): self.config = get_alerts_config() self.logger = setup_logging( service_name=self.config.service_name, log_level=self.config.log_level, log_format=self.config.log_format, ) self.running = False self.db_pool: asyncpg.Pool | None = None self.evaluator: AlertEvaluator | None = None self.subscriber = get_subscriber(topics=["metrics.raw"]) self.publisher = get_publisher(source="alerts") async def connect_db(self) -> None: """Connect to TimescaleDB for rules and alert storage.""" try: self.db_pool = await asyncpg.create_pool( self.config.timescale_url, min_size=1, max_size=5, ) self.logger.info("database_connected") except Exception as e: self.logger.warning("database_connection_failed", error=str(e)) self.db_pool = None async def load_rules(self) -> list[AlertRule]: """Load alert rules from database.""" if not self.db_pool: # Return default rules if no database return [ AlertRule( 1, "High CPU Usage", "CPU_PERCENT", "gt", 80.0, "warning", True ), AlertRule( 2, "Critical CPU Usage", "CPU_PERCENT", "gt", 95.0, "critical", True ), AlertRule( 3, "High Memory Usage", "MEMORY_PERCENT", "gt", 85.0, "warning", True, ), AlertRule( 4, "Critical Memory Usage", "MEMORY_PERCENT", "gt", 95.0, "critical", True, ), AlertRule( 5, "High Disk Usage", "DISK_PERCENT", "gt", 80.0, "warning", True ), AlertRule( 6, "Critical Disk Usage", "DISK_PERCENT", "gt", 90.0, "critical", True, ), ] async with self.db_pool.acquire() as conn: rows = await conn.fetch( "SELECT id, name, metric_type, operator, threshold, severity, enabled FROM alert_rules" ) return [ AlertRule( id=row["id"], name=row["name"], metric_type=row["metric_type"], operator=row["operator"], threshold=row["threshold"], severity=row["severity"], enabled=row["enabled"], ) for row in rows ] async def store_alert(self, alert: Alert) -> None: """Store triggered alert in database.""" if not self.db_pool: return try: async with self.db_pool.acquire() as conn: await conn.execute( """ INSERT INTO alerts (time, machine_id, rule_id, rule_name, metric_type, value, threshold, severity) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) """, alert.triggered_at, alert.machine_id, alert.rule.id, alert.rule.name, alert.rule.metric_type, alert.value, alert.rule.threshold, alert.rule.severity, ) except Exception as e: self.logger.warning("alert_storage_failed", error=str(e)) async def publish_alert(self, alert: Alert) -> None: """Publish alert event for other services (e.g., notifications).""" await self.publisher.publish( topic=f"alerts.{alert.rule.severity}", payload={ "rule_name": alert.rule.name, "machine_id": alert.machine_id, "metric_type": alert.rule.metric_type, "value": alert.value, "threshold": alert.rule.threshold, "severity": alert.rule.severity, "triggered_at": alert.triggered_at.isoformat(), }, ) async def process_metrics(self, event_data: dict[str, Any]) -> None: """Process incoming metrics and evaluate alerts.""" if not self.evaluator: return machine_id = event_data.get("machine_id", "unknown") metrics = event_data.get("metrics", {}) alerts = self.evaluator.evaluate(machine_id, metrics) for alert in alerts: self.logger.warning( "alert_triggered", rule=alert.rule.name, machine_id=alert.machine_id, value=alert.value, threshold=alert.rule.threshold, severity=alert.rule.severity, ) await self.store_alert(alert) await self.publish_alert(alert) async def run(self) -> None: """Main service loop.""" self.running = True self.logger.info("alerts_service_starting") # Connect to database await self.connect_db() # Load rules rules = await self.load_rules() self.evaluator = AlertEvaluator(rules) self.logger.info("rules_loaded", count=len(rules)) # Connect to event bus await self.subscriber.connect() await self.publisher.connect() self.logger.info("alerts_service_started") try: # Process events async for event in self.subscriber.consume(): if not self.running: break try: await self.process_metrics(event.payload) except Exception as e: self.logger.error("event_processing_error", error=str(e)) except asyncio.CancelledError: self.logger.info("alerts_service_cancelled") finally: await self.subscriber.disconnect() await self.publisher.disconnect() if self.db_pool: await self.db_pool.close() self.logger.info("alerts_service_stopped") def stop(self) -> None: """Signal the service to stop.""" self.running = False async def main(): """Main entry point.""" service = AlertsService() # Handle shutdown signals loop = asyncio.get_event_loop() def signal_handler(): service.logger.info("shutdown_signal_received") service.stop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) await service.run() if __name__ == "__main__": asyncio.run(main())