"""Abstract base classes for event publishing and subscribing.""" from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import Any, AsyncIterator import uuid @dataclass class Event: """Standard event envelope.""" topic: str payload: dict[str, Any] event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=datetime.utcnow) source: str = "" def to_dict(self) -> dict[str, Any]: return { "event_id": self.event_id, "topic": self.topic, "timestamp": self.timestamp.isoformat(), "source": self.source, "payload": self.payload, } @classmethod def from_dict(cls, data: dict[str, Any]) -> "Event": return cls( event_id=data.get("event_id", str(uuid.uuid4())), topic=data["topic"], timestamp=datetime.fromisoformat(data["timestamp"]) if "timestamp" in data else datetime.utcnow(), source=data.get("source", ""), payload=data.get("payload", {}), ) class EventPublisher(ABC): """Abstract base for event publishers.""" @abstractmethod async def connect(self) -> None: """Establish connection to the message broker.""" pass @abstractmethod async def disconnect(self) -> None: """Close connection to the message broker.""" pass @abstractmethod async def publish(self, topic: str, payload: dict[str, Any], **kwargs) -> str: """ Publish an event to a topic. Args: topic: The topic/channel to publish to payload: The event data **kwargs: Additional options (e.g., headers, partition key) Returns: The event ID """ pass async def publish_event(self, event: Event) -> str: """Publish a pre-constructed Event object.""" return await self.publish(event.topic, event.payload, event_id=event.event_id) async def __aenter__(self) -> "EventPublisher": await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.disconnect() class EventSubscriber(ABC): """Abstract base for event subscribers.""" @abstractmethod async def connect(self) -> None: """Establish connection to the message broker.""" pass @abstractmethod async def disconnect(self) -> None: """Close connection and unsubscribe.""" pass @abstractmethod async def subscribe(self, topics: list[str]) -> None: """ Subscribe to one or more topics. Args: topics: List of topics/patterns to subscribe to """ pass @abstractmethod async def consume(self) -> AsyncIterator[Event]: """ Async generator that yields events as they arrive. Yields: Event objects """ pass async def __aenter__(self) -> "EventSubscriber": await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: await self.disconnect()