diff --git a/services/aggregator/main.py b/services/aggregator/main.py index 9d04649..9b7d3cc 100644 --- a/services/aggregator/main.py +++ b/services/aggregator/main.py @@ -133,13 +133,16 @@ class MetricsServicer(metrics_pb2_grpc.MetricsServiceServicer): key = f"{metric_type}:{','.join(f'{k}={v}' for k, v in labels.items())}" metrics_dict[key] = value - # Update Redis (current state) - await self.redis.update_machine_state( - machine_id=machine_id, - hostname=hostname, - metrics=metrics_dict, - timestamp_ms=timestamp_ms, - ) + # Update Redis (current state) - don't fail stream if Redis is down + try: + await self.redis.update_machine_state( + machine_id=machine_id, + hostname=hostname, + metrics=metrics_dict, + timestamp_ms=timestamp_ms, + ) + except Exception as e: + self.logger.warning("redis_update_failed", error=str(e)) # Insert into TimescaleDB (historical) try: @@ -161,16 +164,19 @@ class MetricsServicer(metrics_pb2_grpc.MetricsServiceServicer): except Exception as e: self.logger.warning("machine_registry_update_failed", error=str(e)) - # Publish event for subscribers (alerts, gateway) - await self.publisher.publish( - topic="metrics.raw", - payload={ - "machine_id": machine_id, - "hostname": hostname, - "timestamp_ms": timestamp_ms, - "metrics": metrics_dict, - }, - ) + # Publish event for subscribers (alerts, gateway) - don't fail stream + try: + await self.publisher.publish( + topic="metrics.raw", + payload={ + "machine_id": machine_id, + "hostname": hostname, + "timestamp_ms": timestamp_ms, + "metrics": metrics_dict, + }, + ) + except Exception as e: + self.logger.warning("event_publish_failed", error=str(e)) self.logger.debug( "batch_flushed", diff --git a/shared/events/redis_pubsub.py b/shared/events/redis_pubsub.py index 0bffe81..89f5594 100644 --- a/shared/events/redis_pubsub.py +++ b/shared/events/redis_pubsub.py @@ -106,6 +106,51 @@ class RedisPubSubSubscriber(EventSubscriber): self._topics.extend(topics) + async def _reconnect(self) -> None: + """Attempt to reconnect to Redis.""" + max_retries = 10 + base_delay = 1.0 + + for attempt in range(max_retries): + try: + # Clean up old connections + if self._pubsub: + try: + await self._pubsub.close() + except Exception: + pass + if self._client: + try: + await self._client.close() + except Exception: + pass + + # Reconnect + self._client = redis.from_url(self.redis_url, decode_responses=True) + await self._client.ping() + self._pubsub = self._client.pubsub() + + # Re-subscribe to topics + if self._topics: + patterns = [t for t in self._topics if "*" in t] + channels = [t for t in self._topics if "*" not in t] + if channels: + await self._pubsub.subscribe(*channels) + if patterns: + await self._pubsub.psubscribe(*patterns) + + logger.info(f"Reconnected to Redis at {self.redis_url}") + return + + except Exception as e: + delay = min(base_delay * (2**attempt), 30.0) + logger.warning( + f"Reconnect attempt {attempt + 1} failed: {e}, retrying in {delay}s" + ) + await asyncio.sleep(delay) + + raise RuntimeError(f"Failed to reconnect to Redis after {max_retries} attempts") + async def consume(self) -> AsyncIterator[Event]: if not self._pubsub: raise RuntimeError("Subscriber not connected") @@ -138,5 +183,9 @@ class RedisPubSubSubscriber(EventSubscriber): self._running = False break except Exception as e: - logger.error(f"Error consuming events: {e}") - await asyncio.sleep(1.0) + logger.error(f"Error consuming events: {e}, attempting reconnect...") + try: + await self._reconnect() + except Exception as reconnect_err: + logger.error(f"Reconnect failed: {reconnect_err}") + await asyncio.sleep(5.0)