Files
sysmonstm/services/alerts/main.py
2025-12-29 23:44:30 -03:00

318 lines
9.6 KiB
Python

"""Alerts service - subscribes to metrics events and evaluates thresholds."""
import asyncio
import signal
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import asyncpg
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from shared.config import get_alerts_config
from shared.events import get_publisher, get_subscriber
from shared.logging import setup_logging
@dataclass
class AlertRule:
"""An alert rule configuration."""
id: int
name: str
metric_type: str
operator: str # gt, lt, gte, lte, eq
threshold: float
severity: str # warning, critical
enabled: bool
@dataclass
class Alert:
"""A triggered alert."""
rule: AlertRule
machine_id: str
value: float
triggered_at: datetime
class AlertEvaluator:
"""Evaluates metrics against alert rules."""
OPERATORS = {
"gt": lambda v, t: v > t,
"lt": lambda v, t: v < t,
"gte": lambda v, t: v >= t,
"lte": lambda v, t: v <= t,
"eq": lambda v, t: v == t,
}
def __init__(self, rules: list[AlertRule]):
self.rules = {r.metric_type: r for r in rules if r.enabled}
# Track active alerts to avoid duplicates
self.active_alerts: dict[str, Alert] = {} # key: f"{machine_id}:{rule_name}"
def evaluate(self, machine_id: str, metrics: dict[str, float]) -> list[Alert]:
"""Evaluate metrics against rules and return new alerts."""
new_alerts = []
for metric_type, value in metrics.items():
rule = self.rules.get(metric_type)
if not rule:
continue
op_func = self.OPERATORS.get(rule.operator)
if not op_func:
continue
alert_key = f"{machine_id}:{rule.name}"
if op_func(value, rule.threshold):
# Threshold exceeded
if alert_key not in self.active_alerts:
alert = Alert(
rule=rule,
machine_id=machine_id,
value=value,
triggered_at=datetime.utcnow(),
)
self.active_alerts[alert_key] = alert
new_alerts.append(alert)
else:
# Threshold no longer exceeded - resolve alert
if alert_key in self.active_alerts:
del self.active_alerts[alert_key]
return new_alerts
def update_rules(self, rules: list[AlertRule]) -> None:
"""Update the rules being evaluated."""
self.rules = {r.metric_type: r for r in rules if r.enabled}
class AlertsService:
"""Main alerts service."""
def __init__(self):
self.config = get_alerts_config()
self.logger = setup_logging(
service_name=self.config.service_name,
log_level=self.config.log_level,
log_format=self.config.log_format,
)
self.running = False
self.db_pool: asyncpg.Pool | None = None
self.evaluator: AlertEvaluator | None = None
self.subscriber = get_subscriber(topics=["metrics.raw"])
self.publisher = get_publisher(source="alerts")
async def connect_db(self) -> None:
"""Connect to TimescaleDB for rules and alert storage."""
try:
self.db_pool = await asyncpg.create_pool(
self.config.timescale_url,
min_size=1,
max_size=5,
)
self.logger.info("database_connected")
except Exception as e:
self.logger.warning("database_connection_failed", error=str(e))
self.db_pool = None
async def load_rules(self) -> list[AlertRule]:
"""Load alert rules from database."""
if not self.db_pool:
# Return default rules if no database
return [
AlertRule(
1, "High CPU Usage", "CPU_PERCENT", "gt", 80.0, "warning", True
),
AlertRule(
2, "Critical CPU Usage", "CPU_PERCENT", "gt", 95.0, "critical", True
),
AlertRule(
3,
"High Memory Usage",
"MEMORY_PERCENT",
"gt",
85.0,
"warning",
True,
),
AlertRule(
4,
"Critical Memory Usage",
"MEMORY_PERCENT",
"gt",
95.0,
"critical",
True,
),
AlertRule(
5, "High Disk Usage", "DISK_PERCENT", "gt", 80.0, "warning", True
),
AlertRule(
6,
"Critical Disk Usage",
"DISK_PERCENT",
"gt",
90.0,
"critical",
True,
),
]
async with self.db_pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, name, metric_type, operator, threshold, severity, enabled FROM alert_rules"
)
return [
AlertRule(
id=row["id"],
name=row["name"],
metric_type=row["metric_type"],
operator=row["operator"],
threshold=row["threshold"],
severity=row["severity"],
enabled=row["enabled"],
)
for row in rows
]
async def store_alert(self, alert: Alert) -> None:
"""Store triggered alert in database."""
if not self.db_pool:
return
try:
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO alerts (time, machine_id, rule_id, rule_name, metric_type, value, threshold, severity)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
alert.triggered_at,
alert.machine_id,
alert.rule.id,
alert.rule.name,
alert.rule.metric_type,
alert.value,
alert.rule.threshold,
alert.rule.severity,
)
except Exception as e:
self.logger.warning("alert_storage_failed", error=str(e))
async def publish_alert(self, alert: Alert) -> None:
"""Publish alert event for other services (e.g., notifications)."""
await self.publisher.publish(
topic=f"alerts.{alert.rule.severity}",
payload={
"rule_name": alert.rule.name,
"machine_id": alert.machine_id,
"metric_type": alert.rule.metric_type,
"value": alert.value,
"threshold": alert.rule.threshold,
"severity": alert.rule.severity,
"triggered_at": alert.triggered_at.isoformat(),
},
)
async def process_metrics(self, event_data: dict[str, Any]) -> None:
"""Process incoming metrics and evaluate alerts."""
if not self.evaluator:
return
machine_id = event_data.get("machine_id", "unknown")
metrics = event_data.get("metrics", {})
alerts = self.evaluator.evaluate(machine_id, metrics)
for alert in alerts:
self.logger.warning(
"alert_triggered",
rule=alert.rule.name,
machine_id=alert.machine_id,
value=alert.value,
threshold=alert.rule.threshold,
severity=alert.rule.severity,
)
await self.store_alert(alert)
await self.publish_alert(alert)
async def run(self) -> None:
"""Main service loop."""
self.running = True
self.logger.info("alerts_service_starting")
# Connect to database
await self.connect_db()
# Load rules
rules = await self.load_rules()
self.evaluator = AlertEvaluator(rules)
self.logger.info("rules_loaded", count=len(rules))
# Connect to event bus
await self.subscriber.connect()
await self.publisher.connect()
self.logger.info("alerts_service_started")
try:
# Process events
async for event in self.subscriber.consume():
if not self.running:
break
try:
await self.process_metrics(event.payload)
except Exception as e:
self.logger.error("event_processing_error", error=str(e))
except asyncio.CancelledError:
self.logger.info("alerts_service_cancelled")
finally:
await self.subscriber.disconnect()
await self.publisher.disconnect()
if self.db_pool:
await self.db_pool.close()
self.logger.info("alerts_service_stopped")
def stop(self) -> None:
"""Signal the service to stop."""
self.running = False
async def main():
"""Main entry point."""
service = AlertsService()
# Handle shutdown signals
loop = asyncio.get_event_loop()
def signal_handler():
service.logger.info("shutdown_signal_received")
service.stop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
await service.run()
if __name__ == "__main__":
asyncio.run(main())