318 lines
9.6 KiB
Python
318 lines
9.6 KiB
Python
"""Alerts service - subscribes to metrics events and evaluates thresholds."""
|
|
|
|
import asyncio
|
|
import signal
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
# Add project root to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from shared.config import get_alerts_config
|
|
from shared.events import get_publisher, get_subscriber
|
|
from shared.logging import setup_logging
|
|
|
|
|
|
@dataclass
|
|
class AlertRule:
|
|
"""An alert rule configuration."""
|
|
|
|
id: int
|
|
name: str
|
|
metric_type: str
|
|
operator: str # gt, lt, gte, lte, eq
|
|
threshold: float
|
|
severity: str # warning, critical
|
|
enabled: bool
|
|
|
|
|
|
@dataclass
|
|
class Alert:
|
|
"""A triggered alert."""
|
|
|
|
rule: AlertRule
|
|
machine_id: str
|
|
value: float
|
|
triggered_at: datetime
|
|
|
|
|
|
class AlertEvaluator:
|
|
"""Evaluates metrics against alert rules."""
|
|
|
|
OPERATORS = {
|
|
"gt": lambda v, t: v > t,
|
|
"lt": lambda v, t: v < t,
|
|
"gte": lambda v, t: v >= t,
|
|
"lte": lambda v, t: v <= t,
|
|
"eq": lambda v, t: v == t,
|
|
}
|
|
|
|
def __init__(self, rules: list[AlertRule]):
|
|
self.rules = {r.metric_type: r for r in rules if r.enabled}
|
|
# Track active alerts to avoid duplicates
|
|
self.active_alerts: dict[str, Alert] = {} # key: f"{machine_id}:{rule_name}"
|
|
|
|
def evaluate(self, machine_id: str, metrics: dict[str, float]) -> list[Alert]:
|
|
"""Evaluate metrics against rules and return new alerts."""
|
|
new_alerts = []
|
|
|
|
for metric_type, value in metrics.items():
|
|
rule = self.rules.get(metric_type)
|
|
if not rule:
|
|
continue
|
|
|
|
op_func = self.OPERATORS.get(rule.operator)
|
|
if not op_func:
|
|
continue
|
|
|
|
alert_key = f"{machine_id}:{rule.name}"
|
|
|
|
if op_func(value, rule.threshold):
|
|
# Threshold exceeded
|
|
if alert_key not in self.active_alerts:
|
|
alert = Alert(
|
|
rule=rule,
|
|
machine_id=machine_id,
|
|
value=value,
|
|
triggered_at=datetime.utcnow(),
|
|
)
|
|
self.active_alerts[alert_key] = alert
|
|
new_alerts.append(alert)
|
|
else:
|
|
# Threshold no longer exceeded - resolve alert
|
|
if alert_key in self.active_alerts:
|
|
del self.active_alerts[alert_key]
|
|
|
|
return new_alerts
|
|
|
|
def update_rules(self, rules: list[AlertRule]) -> None:
|
|
"""Update the rules being evaluated."""
|
|
self.rules = {r.metric_type: r for r in rules if r.enabled}
|
|
|
|
|
|
class AlertsService:
|
|
"""Main alerts service."""
|
|
|
|
def __init__(self):
|
|
self.config = get_alerts_config()
|
|
self.logger = setup_logging(
|
|
service_name=self.config.service_name,
|
|
log_level=self.config.log_level,
|
|
log_format=self.config.log_format,
|
|
)
|
|
|
|
self.running = False
|
|
self.db_pool: asyncpg.Pool | None = None
|
|
self.evaluator: AlertEvaluator | None = None
|
|
self.subscriber = get_subscriber(topics=["metrics.raw"])
|
|
self.publisher = get_publisher(source="alerts")
|
|
|
|
async def connect_db(self) -> None:
|
|
"""Connect to TimescaleDB for rules and alert storage."""
|
|
try:
|
|
self.db_pool = await asyncpg.create_pool(
|
|
self.config.timescale_url,
|
|
min_size=1,
|
|
max_size=5,
|
|
)
|
|
self.logger.info("database_connected")
|
|
except Exception as e:
|
|
self.logger.warning("database_connection_failed", error=str(e))
|
|
self.db_pool = None
|
|
|
|
async def load_rules(self) -> list[AlertRule]:
|
|
"""Load alert rules from database."""
|
|
if not self.db_pool:
|
|
# Return default rules if no database
|
|
return [
|
|
AlertRule(
|
|
1, "High CPU Usage", "CPU_PERCENT", "gt", 80.0, "warning", True
|
|
),
|
|
AlertRule(
|
|
2, "Critical CPU Usage", "CPU_PERCENT", "gt", 95.0, "critical", True
|
|
),
|
|
AlertRule(
|
|
3,
|
|
"High Memory Usage",
|
|
"MEMORY_PERCENT",
|
|
"gt",
|
|
85.0,
|
|
"warning",
|
|
True,
|
|
),
|
|
AlertRule(
|
|
4,
|
|
"Critical Memory Usage",
|
|
"MEMORY_PERCENT",
|
|
"gt",
|
|
95.0,
|
|
"critical",
|
|
True,
|
|
),
|
|
AlertRule(
|
|
5, "High Disk Usage", "DISK_PERCENT", "gt", 80.0, "warning", True
|
|
),
|
|
AlertRule(
|
|
6,
|
|
"Critical Disk Usage",
|
|
"DISK_PERCENT",
|
|
"gt",
|
|
90.0,
|
|
"critical",
|
|
True,
|
|
),
|
|
]
|
|
|
|
async with self.db_pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT id, name, metric_type, operator, threshold, severity, enabled FROM alert_rules"
|
|
)
|
|
|
|
return [
|
|
AlertRule(
|
|
id=row["id"],
|
|
name=row["name"],
|
|
metric_type=row["metric_type"],
|
|
operator=row["operator"],
|
|
threshold=row["threshold"],
|
|
severity=row["severity"],
|
|
enabled=row["enabled"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
async def store_alert(self, alert: Alert) -> None:
|
|
"""Store triggered alert in database."""
|
|
if not self.db_pool:
|
|
return
|
|
|
|
try:
|
|
async with self.db_pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO alerts (time, machine_id, rule_id, rule_name, metric_type, value, threshold, severity)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
""",
|
|
alert.triggered_at,
|
|
alert.machine_id,
|
|
alert.rule.id,
|
|
alert.rule.name,
|
|
alert.rule.metric_type,
|
|
alert.value,
|
|
alert.rule.threshold,
|
|
alert.rule.severity,
|
|
)
|
|
except Exception as e:
|
|
self.logger.warning("alert_storage_failed", error=str(e))
|
|
|
|
async def publish_alert(self, alert: Alert) -> None:
|
|
"""Publish alert event for other services (e.g., notifications)."""
|
|
await self.publisher.publish(
|
|
topic=f"alerts.{alert.rule.severity}",
|
|
payload={
|
|
"rule_name": alert.rule.name,
|
|
"machine_id": alert.machine_id,
|
|
"metric_type": alert.rule.metric_type,
|
|
"value": alert.value,
|
|
"threshold": alert.rule.threshold,
|
|
"severity": alert.rule.severity,
|
|
"triggered_at": alert.triggered_at.isoformat(),
|
|
},
|
|
)
|
|
|
|
async def process_metrics(self, event_data: dict[str, Any]) -> None:
|
|
"""Process incoming metrics and evaluate alerts."""
|
|
if not self.evaluator:
|
|
return
|
|
|
|
machine_id = event_data.get("machine_id", "unknown")
|
|
metrics = event_data.get("metrics", {})
|
|
|
|
alerts = self.evaluator.evaluate(machine_id, metrics)
|
|
|
|
for alert in alerts:
|
|
self.logger.warning(
|
|
"alert_triggered",
|
|
rule=alert.rule.name,
|
|
machine_id=alert.machine_id,
|
|
value=alert.value,
|
|
threshold=alert.rule.threshold,
|
|
severity=alert.rule.severity,
|
|
)
|
|
|
|
await self.store_alert(alert)
|
|
await self.publish_alert(alert)
|
|
|
|
async def run(self) -> None:
|
|
"""Main service loop."""
|
|
self.running = True
|
|
|
|
self.logger.info("alerts_service_starting")
|
|
|
|
# Connect to database
|
|
await self.connect_db()
|
|
|
|
# Load rules
|
|
rules = await self.load_rules()
|
|
self.evaluator = AlertEvaluator(rules)
|
|
self.logger.info("rules_loaded", count=len(rules))
|
|
|
|
# Connect to event bus
|
|
await self.subscriber.connect()
|
|
await self.publisher.connect()
|
|
|
|
self.logger.info("alerts_service_started")
|
|
|
|
try:
|
|
# Process events
|
|
async for event in self.subscriber.consume():
|
|
if not self.running:
|
|
break
|
|
|
|
try:
|
|
await self.process_metrics(event.payload)
|
|
except Exception as e:
|
|
self.logger.error("event_processing_error", error=str(e))
|
|
|
|
except asyncio.CancelledError:
|
|
self.logger.info("alerts_service_cancelled")
|
|
|
|
finally:
|
|
await self.subscriber.disconnect()
|
|
await self.publisher.disconnect()
|
|
|
|
if self.db_pool:
|
|
await self.db_pool.close()
|
|
|
|
self.logger.info("alerts_service_stopped")
|
|
|
|
def stop(self) -> None:
|
|
"""Signal the service to stop."""
|
|
self.running = False
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
service = AlertsService()
|
|
|
|
# Handle shutdown signals
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def signal_handler():
|
|
service.logger.info("shutdown_signal_received")
|
|
service.stop()
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, signal_handler)
|
|
|
|
await service.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|