"""Collector service - streams system metrics to the aggregator via gRPC.""" import asyncio import signal import sys from pathlib import Path import grpc # Add project root to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from services.collector.metrics import MetricsCollector from shared import metrics_pb2, metrics_pb2_grpc from shared.config import get_collector_config from shared.logging import setup_logging class CollectorService: """Main collector service that streams metrics to the aggregator.""" def __init__(self): self.config = get_collector_config() self.logger = setup_logging( service_name=self.config.service_name, log_level=self.config.log_level, log_format=self.config.log_format, ) self.running = False self.channel: grpc.aio.Channel | None = None self.stub: metrics_pb2_grpc.MetricsServiceStub | None = None self.collector = MetricsCollector( machine_id=self.config.machine_id, collect_cpu=self.config.collect_cpu, collect_memory=self.config.collect_memory, collect_disk=self.config.collect_disk, collect_network=self.config.collect_network, collect_load=self.config.collect_load, ) async def connect(self) -> None: """Establish connection to the aggregator.""" self.logger.info( "connecting_to_aggregator", aggregator_url=self.config.aggregator_url, ) self.channel = grpc.aio.insecure_channel( self.config.aggregator_url, options=[ ("grpc.keepalive_time_ms", 10000), ("grpc.keepalive_timeout_ms", 5000), ("grpc.keepalive_permit_without_calls", True), ], ) self.stub = metrics_pb2_grpc.MetricsServiceStub(self.channel) # Wait for channel to be ready try: await asyncio.wait_for( self.channel.channel_ready(), timeout=10.0, ) self.logger.info("connected_to_aggregator") except asyncio.TimeoutError: self.logger.error("connection_timeout") raise async def disconnect(self) -> None: """Close connection to the aggregator.""" if self.channel: await self.channel.close() self.channel = None self.stub = None self.logger.info("disconnected_from_aggregator") def _batch_to_proto(self, batch) -> list[metrics_pb2.Metric]: """Convert a MetricsBatch to protobuf messages.""" protos = [] for metric in batch.metrics: proto = metrics_pb2.Metric( machine_id=batch.machine_id, hostname=batch.hostname, timestamp_ms=batch.timestamp_ms, type=getattr(metrics_pb2, metric.metric_type, 0), value=metric.value, labels=metric.labels, ) protos.append(proto) return protos async def _metric_generator(self): """Async generator that yields metrics at the configured interval.""" while self.running: batch = self.collector.collect() protos = self._batch_to_proto(batch) for proto in protos: yield proto self.logger.debug( "collected_metrics", count=len(protos), machine_id=batch.machine_id, ) await asyncio.sleep(self.config.collection_interval) async def stream_metrics(self) -> None: """Stream metrics to the aggregator.""" if not self.stub: raise RuntimeError("Not connected to aggregator") retry_count = 0 max_retries = 10 base_delay = 1.0 while self.running: try: self.logger.info("starting_metric_stream") response = await self.stub.StreamMetrics(self._metric_generator()) self.logger.info( "stream_completed", success=response.success, metrics_received=response.metrics_received, message=response.message, ) retry_count = 0 except grpc.aio.AioRpcError as e: retry_count += 1 delay = min(base_delay * (2**retry_count), 60.0) self.logger.warning( "stream_error", code=e.code().name, details=e.details(), retry_count=retry_count, retry_delay=delay, ) if retry_count >= max_retries: self.logger.error("max_retries_exceeded") raise await asyncio.sleep(delay) # Reconnect try: await self.disconnect() await self.connect() except Exception as conn_err: self.logger.error("reconnect_failed", error=str(conn_err)) except asyncio.CancelledError: self.logger.info("stream_cancelled") break async def run(self) -> None: """Main entry point for the collector service.""" self.running = True self.logger.info( "collector_starting", machine_id=self.config.machine_id, interval=self.config.collection_interval, ) # Initial CPU percent call to initialize (first call always returns 0) import psutil psutil.cpu_percent() await self.connect() try: await self.stream_metrics() finally: await self.disconnect() self.logger.info("collector_stopped") def stop(self) -> None: """Signal the collector to stop.""" self.running = False async def main(): """Main entry point.""" service = CollectorService() # Handle shutdown signals loop = asyncio.get_event_loop() def signal_handler(): service.logger.info("shutdown_signal_received") service.stop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) await service.run() if __name__ == "__main__": asyncio.run(main())