102 lines
3.1 KiB
Python
102 lines
3.1 KiB
Python
"""Factory functions for creating event publishers and subscribers."""
|
|
|
|
import os
|
|
from enum import Enum
|
|
|
|
from .base import EventPublisher, EventSubscriber
|
|
from .redis_pubsub import RedisPubSubPublisher, RedisPubSubSubscriber
|
|
|
|
|
|
class EventBackend(str, Enum):
|
|
"""Supported event backends."""
|
|
|
|
REDIS_PUBSUB = "redis_pubsub"
|
|
REDIS_STREAMS = "redis_streams" # Future
|
|
KAFKA = "kafka" # Future
|
|
|
|
|
|
def get_publisher(
|
|
backend: EventBackend | str | None = None,
|
|
source: str = "",
|
|
**kwargs,
|
|
) -> EventPublisher:
|
|
"""
|
|
Factory function to get an event publisher.
|
|
|
|
Args:
|
|
backend: The event backend to use (default: from EVENTS_BACKEND env var or redis_pubsub)
|
|
source: Identifier for the source service
|
|
**kwargs: Backend-specific options
|
|
|
|
Returns:
|
|
An EventPublisher instance
|
|
|
|
Environment variables:
|
|
EVENTS_BACKEND: Default backend (redis_pubsub, redis_streams, kafka)
|
|
REDIS_URL: Redis connection URL
|
|
KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (future)
|
|
"""
|
|
if backend is None:
|
|
backend = os.getenv("EVENTS_BACKEND", EventBackend.REDIS_PUBSUB)
|
|
|
|
if isinstance(backend, str):
|
|
backend = EventBackend(backend)
|
|
|
|
if backend == EventBackend.REDIS_PUBSUB:
|
|
redis_url = kwargs.get("redis_url") or os.getenv(
|
|
"REDIS_URL", "redis://localhost:6379"
|
|
)
|
|
return RedisPubSubPublisher(redis_url=redis_url, source=source)
|
|
|
|
elif backend == EventBackend.REDIS_STREAMS:
|
|
raise NotImplementedError("Redis Streams backend not yet implemented")
|
|
|
|
elif backend == EventBackend.KAFKA:
|
|
raise NotImplementedError("Kafka backend not yet implemented")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown event backend: {backend}")
|
|
|
|
|
|
def get_subscriber(
|
|
topics: list[str] | None = None,
|
|
backend: EventBackend | str | None = None,
|
|
**kwargs,
|
|
) -> EventSubscriber:
|
|
"""
|
|
Factory function to get an event subscriber.
|
|
|
|
Args:
|
|
topics: Topics to subscribe to
|
|
backend: The event backend to use (default: from EVENTS_BACKEND env var or redis_pubsub)
|
|
**kwargs: Backend-specific options
|
|
|
|
Returns:
|
|
An EventSubscriber instance
|
|
|
|
Environment variables:
|
|
EVENTS_BACKEND: Default backend (redis_pubsub, redis_streams, kafka)
|
|
REDIS_URL: Redis connection URL
|
|
KAFKA_BOOTSTRAP_SERVERS: Kafka bootstrap servers (future)
|
|
"""
|
|
if backend is None:
|
|
backend = os.getenv("EVENTS_BACKEND", EventBackend.REDIS_PUBSUB)
|
|
|
|
if isinstance(backend, str):
|
|
backend = EventBackend(backend)
|
|
|
|
if backend == EventBackend.REDIS_PUBSUB:
|
|
redis_url = kwargs.get("redis_url") or os.getenv(
|
|
"REDIS_URL", "redis://localhost:6379"
|
|
)
|
|
return RedisPubSubSubscriber(redis_url=redis_url, topics=topics)
|
|
|
|
elif backend == EventBackend.REDIS_STREAMS:
|
|
raise NotImplementedError("Redis Streams backend not yet implemented")
|
|
|
|
elif backend == EventBackend.KAFKA:
|
|
raise NotImplementedError("Kafka backend not yet implemented")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown event backend: {backend}")
|