diff --git a/core/api/detect_sse.py b/core/api/detect_sse.py new file mode 100644 index 0000000..309c7ce --- /dev/null +++ b/core/api/detect_sse.py @@ -0,0 +1,72 @@ +""" +SSE endpoint for detection pipeline events. + +Uses Redis as the event bus between pipeline workers and the SSE stream. +Mirrors chunker_sse.py but polls detect_events:{job_id}. + +GET /detect/stream/{job_id} → text/event-stream +""" + +import asyncio +import json +import logging +import time +from typing import AsyncGenerator + +from fastapi import APIRouter +from starlette.responses import StreamingResponse + +from core.events import poll_events +from detect.events import DETECT_EVENTS_PREFIX, TERMINAL_EVENTS + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/detect", tags=["detect"]) + + +async def _event_generator(job_id: str) -> AsyncGenerator[str, None]: + cursor = 0 + timeout = time.monotonic() + 3600 # 1 hour max (detection jobs are long) + + while time.monotonic() < timeout: + events, cursor = poll_events(job_id, cursor, prefix=DETECT_EVENTS_PREFIX) + + if not events: + yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n" + await asyncio.sleep(0.1) + continue + + for data in events: + event_type = data.pop("event", "update") + payload = {**data, "job_id": job_id} + + yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n" + + if event_type in TERMINAL_EVENTS: + yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n" + return + + await asyncio.sleep(0.05) + + yield f"event: timeout\ndata: {json.dumps({'job_id': job_id})}\n\n" + + +@router.get("/stream/{job_id}") +async def stream_detect_job(job_id: str): + """ + SSE stream for a detection pipeline job. + + The UI connects via native EventSource: + const es = new EventSource('/api/detect/stream/'); + es.addEventListener('graph_update', (e) => { ... }); + es.addEventListener('detection', (e) => { ... }); + """ + return StreamingResponse( + _event_generator(job_id), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/core/api/main.py b/core/api/main.py index 8d62cf8..fa307e1 100644 --- a/core/api/main.py +++ b/core/api/main.py @@ -24,6 +24,7 @@ from fastapi.middleware.cors import CORSMiddleware from strawberry.fastapi import GraphQLRouter from core.api.chunker_sse import router as chunker_router +from core.api.detect_sse import router as detect_router from core.api.graphql import schema as graphql_schema CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") @@ -52,6 +53,9 @@ app.include_router(graphql_router, prefix="/graphql") # Chunker SSE app.include_router(chunker_router) +# Detection SSE +app.include_router(detect_router) + @app.get("/") def root(): diff --git a/core/events.py b/core/events.py index 5ea6980..a6b30cb 100644 --- a/core/events.py +++ b/core/events.py @@ -17,19 +17,23 @@ def _get_redis(): return redis.from_url(REDIS_URL, decode_responses=True) -def push_event(job_id: str, event_type: str, data: dict) -> None: +def push_event( + job_id: str, event_type: str, data: dict, prefix: str = "chunk_events" +) -> None: """Push an event to the Redis list for a job.""" r = _get_redis() - key = f"chunk_events:{job_id}" + key = f"{prefix}:{job_id}" event = json.dumps({"event": event_type, **data}) r.rpush(key, event) r.expire(key, 3600) -def poll_events(job_id: str, cursor: int = 0) -> tuple[list[dict], int]: +def poll_events( + job_id: str, cursor: int = 0, prefix: str = "chunk_events" +) -> tuple[list[dict], int]: """Poll new events from Redis. Returns (events, new_cursor).""" r = _get_redis() - key = f"chunk_events:{job_id}" + key = f"{prefix}:{job_id}" raw_events = r.lrange(key, cursor, -1) parsed = [] for raw in raw_events: diff --git a/core/schema/modelgen.json b/core/schema/modelgen.json index dcf1c6d..9df4022 100644 --- a/core/schema/modelgen.json +++ b/core/schema/modelgen.json @@ -20,6 +20,16 @@ "target": "protobuf", "output": "core/rpc/protos/worker.proto", "include": ["grpc"] + }, + { + "target": "pydantic", + "output": "detect/sse_contract.py", + "include": ["detect_views"] + }, + { + "target": "typescript", + "output": "ui/detection-app/src/types/sse-contract.ts", + "include": ["detect_views"] } ] } diff --git a/core/schema/models/__init__.py b/core/schema/models/__init__.py index d62b172..8b6266d 100644 --- a/core/schema/models/__init__.py +++ b/core/schema/models/__init__.py @@ -28,6 +28,7 @@ from .grpc import ( from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob from .media import AssetStatus, MediaAsset from .presets import BUILTIN_PRESETS, TranscodePreset +from .detect import DETECT_VIEWS # noqa: F401 — discovered by modelgen generic loader from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent # Core domain models - generates Django, Pydantic, TypeScript @@ -50,6 +51,7 @@ ENUMS = [AssetStatus, JobStatus, ChunkJobStatus] # View/event models - generates TypeScript for UI consumption VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile] + # gRPC messages - generates Proto GRPC_MESSAGES = [ JobRequest, diff --git a/core/schema/models/detect.py b/core/schema/models/detect.py new file mode 100644 index 0000000..4e373c6 --- /dev/null +++ b/core/schema/models/detect.py @@ -0,0 +1,166 @@ +""" +Detection Pipeline Schema Definitions + +Source of truth for all detection SSE events and wire-format models. +Generates: Pydantic (detect/sse_contract.py), TypeScript (ui/detection-app/src/types/sse-contract.ts) + +Pipeline-internal models that never cross the wire (e.g. Frame with np.ndarray) +live in detect/models.py and are NOT generated. +""" + +from dataclasses import dataclass, field +from typing import List, Literal, Optional + + +# --- Enums as Literal unions (wire format, not Python Enum) --- + +NodeStatus = Literal["idle", "processing", "completed", "error"] +DetectionSource = Literal["ocr", "local_vlm", "cloud_llm", "logo_match", "auxiliary"] +LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"] + + +# --- Nested components --- + + +@dataclass +class GraphNode: + """A pipeline stage node.""" + + id: str + status: str = "idle" # NodeStatus + items_in: int = 0 + items_out: int = 0 + + +@dataclass +class GraphEdge: + """An edge between pipeline stages.""" + + source: str + target: str + throughput: int = 0 + + +@dataclass +class BoundingBoxEvent: + """Bounding box in SSE event payloads.""" + + x: int + y: int + w: int + h: int + confidence: float + label: str + resolved_brand: Optional[str] = None + source: Optional[str] = None + + +@dataclass +class BrandSummary: + """Per-brand stats in the final report.""" + + brand: str + total_appearances: int = 0 + total_screen_time: float = 0.0 + avg_confidence: float = 0.0 + first_seen: float = 0.0 + last_seen: float = 0.0 + + +# --- SSE event payloads --- + + +@dataclass +class GraphUpdate: + """Pipeline node state transition. SSE event: graph_update""" + + nodes: List[GraphNode] = field(default_factory=list) + edges: List[GraphEdge] = field(default_factory=list) + active_path: List[str] = field(default_factory=list) + + +@dataclass +class StatsUpdate: + """Funnel statistics snapshot. SSE event: stats_update""" + + frames_extracted: int = 0 + frames_after_scene_filter: int = 0 + regions_detected: int = 0 + regions_resolved_by_ocr: int = 0 + regions_escalated_to_local_vlm: int = 0 + regions_escalated_to_cloud_llm: int = 0 + cloud_llm_calls: int = 0 + processing_time_seconds: float = 0.0 + estimated_cloud_cost_usd: float = 0.0 + + +@dataclass +class FrameUpdate: + """Current frame being processed. SSE event: frame_update""" + + frame_ref: int + timestamp: float + jpeg_b64: str + boxes: List[BoundingBoxEvent] = field(default_factory=list) + + +@dataclass +class Detection: + """A confirmed brand detection. SSE event: detection""" + + brand: str + timestamp: float + duration: float + confidence: float + source: str # DetectionSource + content_type: str + bbox: Optional[BoundingBoxEvent] = None + frame_ref: Optional[int] = None + + +@dataclass +class LogEvent: + """Pipeline log line. SSE event: log""" + + level: str # LogLevel + stage: str + msg: str + ts: str + trace_id: Optional[str] = None + + +@dataclass +class DetectionReportSummary: + """Final detection report summary.""" + + video_source: str + content_type: str + duration_seconds: float + total_detections: int = 0 + brands: List[BrandSummary] = field(default_factory=list) + stats: Optional[StatsUpdate] = None + + +@dataclass +class JobComplete: + """Final report when pipeline finishes. SSE event: job_complete""" + + job_id: str + report: Optional[DetectionReportSummary] = None + + +# --- Export lists for modelgen --- + +DETECT_VIEWS = [ + GraphNode, + GraphEdge, + BoundingBoxEvent, + BrandSummary, + GraphUpdate, + StatsUpdate, + FrameUpdate, + Detection, + LogEvent, + DetectionReportSummary, + JobComplete, +] diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 47c9ae2..e3d562f 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -190,6 +190,20 @@ services: - ../ui/chunker/vite.config.ts:/app/vite.config.ts - ../ui/common:/common + detection: + build: + context: ../ui/detection-app + dockerfile: Dockerfile + ports: + - "5175:5175" + environment: + VITE_ALLOWED_HOSTS: ${VITE_ALLOWED_HOSTS:-} + volumes: + - ../ui/detection-app/src:/app/src + - ../ui/detection-app/vite.config.ts:/app/vite.config.ts + - ../ui/detection-app/index.html:/app/index.html + - ../ui/framework:/app/node_modules/mpr-ui-framework + volumes: postgres-data: redis-data: diff --git a/ctrl/landing.html b/ctrl/landing.html index 7ecfafc..8de347a 100644 --- a/ctrl/landing.html +++ b/ctrl/landing.html @@ -97,6 +97,11 @@
Chunker
Split media into segments, pipeline visualization
+ +
+
Detection
+
Media brand detection, realtime observability
+