""" Redis-based event bus for pipeline job progress. Celery workers push events, SSE endpoints poll them. Only depends on redis — safe to import from any context. """ import json import os import redis REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") def _get_redis(): return redis.from_url(REDIS_URL, decode_responses=True) def push_event(job_id: str, event_type: str, data: dict) -> None: """Push an event to the Redis list for a job.""" r = _get_redis() key = f"chunk_events:{job_id}" event = json.dumps({"event": event_type, **data}) r.rpush(key, event) r.expire(key, 3600) def poll_events(job_id: str, cursor: int = 0) -> tuple[list[dict], int]: """Poll new events from Redis. Returns (events, new_cursor).""" r = _get_redis() key = f"chunk_events:{job_id}" raw_events = r.lrange(key, cursor, -1) parsed = [] for raw in raw_events: try: parsed.append(json.loads(raw)) except (json.JSONDecodeError, TypeError): pass return parsed, cursor + len(raw_events)