Files
sysmonstm/services/collector/main.py
2025-12-29 23:44:30 -03:00

210 lines
6.3 KiB
Python

"""Collector service - streams system metrics to the aggregator via gRPC."""
import asyncio
import signal
import sys
from pathlib import Path
import grpc
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from services.collector.metrics import MetricsCollector
from shared import metrics_pb2, metrics_pb2_grpc
from shared.config import get_collector_config
from shared.logging import setup_logging
class CollectorService:
"""Main collector service that streams metrics to the aggregator."""
def __init__(self):
self.config = get_collector_config()
self.logger = setup_logging(
service_name=self.config.service_name,
log_level=self.config.log_level,
log_format=self.config.log_format,
)
self.running = False
self.channel: grpc.aio.Channel | None = None
self.stub: metrics_pb2_grpc.MetricsServiceStub | None = None
self.collector = MetricsCollector(
machine_id=self.config.machine_id,
collect_cpu=self.config.collect_cpu,
collect_memory=self.config.collect_memory,
collect_disk=self.config.collect_disk,
collect_network=self.config.collect_network,
collect_load=self.config.collect_load,
)
async def connect(self) -> None:
"""Establish connection to the aggregator."""
self.logger.info(
"connecting_to_aggregator",
aggregator_url=self.config.aggregator_url,
)
self.channel = grpc.aio.insecure_channel(
self.config.aggregator_url,
options=[
("grpc.keepalive_time_ms", 10000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.keepalive_permit_without_calls", True),
],
)
self.stub = metrics_pb2_grpc.MetricsServiceStub(self.channel)
# Wait for channel to be ready
try:
await asyncio.wait_for(
self.channel.channel_ready(),
timeout=10.0,
)
self.logger.info("connected_to_aggregator")
except asyncio.TimeoutError:
self.logger.error("connection_timeout")
raise
async def disconnect(self) -> None:
"""Close connection to the aggregator."""
if self.channel:
await self.channel.close()
self.channel = None
self.stub = None
self.logger.info("disconnected_from_aggregator")
def _batch_to_proto(self, batch) -> list[metrics_pb2.Metric]:
"""Convert a MetricsBatch to protobuf messages."""
protos = []
for metric in batch.metrics:
proto = metrics_pb2.Metric(
machine_id=batch.machine_id,
hostname=batch.hostname,
timestamp_ms=batch.timestamp_ms,
type=getattr(metrics_pb2, metric.metric_type, 0),
value=metric.value,
labels=metric.labels,
)
protos.append(proto)
return protos
async def _metric_generator(self):
"""Async generator that yields metrics at the configured interval."""
while self.running:
batch = self.collector.collect()
protos = self._batch_to_proto(batch)
for proto in protos:
yield proto
self.logger.debug(
"collected_metrics",
count=len(protos),
machine_id=batch.machine_id,
)
await asyncio.sleep(self.config.collection_interval)
async def stream_metrics(self) -> None:
"""Stream metrics to the aggregator."""
if not self.stub:
raise RuntimeError("Not connected to aggregator")
retry_count = 0
max_retries = 10
base_delay = 1.0
while self.running:
try:
self.logger.info("starting_metric_stream")
response = await self.stub.StreamMetrics(self._metric_generator())
self.logger.info(
"stream_completed",
success=response.success,
metrics_received=response.metrics_received,
message=response.message,
)
retry_count = 0
except grpc.aio.AioRpcError as e:
retry_count += 1
delay = min(base_delay * (2**retry_count), 60.0)
self.logger.warning(
"stream_error",
code=e.code().name,
details=e.details(),
retry_count=retry_count,
retry_delay=delay,
)
if retry_count >= max_retries:
self.logger.error("max_retries_exceeded")
raise
await asyncio.sleep(delay)
# Reconnect
try:
await self.disconnect()
await self.connect()
except Exception as conn_err:
self.logger.error("reconnect_failed", error=str(conn_err))
except asyncio.CancelledError:
self.logger.info("stream_cancelled")
break
async def run(self) -> None:
"""Main entry point for the collector service."""
self.running = True
self.logger.info(
"collector_starting",
machine_id=self.config.machine_id,
interval=self.config.collection_interval,
)
# Initial CPU percent call to initialize (first call always returns 0)
import psutil
psutil.cpu_percent()
await self.connect()
try:
await self.stream_metrics()
finally:
await self.disconnect()
self.logger.info("collector_stopped")
def stop(self) -> None:
"""Signal the collector to stop."""
self.running = False
async def main():
"""Main entry point."""
service = CollectorService()
# Handle shutdown signals
loop = asyncio.get_event_loop()
def signal_handler():
service.logger.info("shutdown_signal_received")
service.stop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
await service.run()
if __name__ == "__main__":
asyncio.run(main())