This commit is contained in:
2026-03-23 09:58:40 -03:00
parent 9c9c7dff09
commit 8186bb5fe6
40 changed files with 3996 additions and 17 deletions

72
core/api/detect_sse.py Normal file
View File

@@ -0,0 +1,72 @@
"""
SSE endpoint for detection pipeline events.
Uses Redis as the event bus between pipeline workers and the SSE stream.
Mirrors chunker_sse.py but polls detect_events:{job_id}.
GET /detect/stream/{job_id} → text/event-stream
"""
import asyncio
import json
import logging
import time
from typing import AsyncGenerator
from fastapi import APIRouter
from starlette.responses import StreamingResponse
from core.events import poll_events
from detect.events import DETECT_EVENTS_PREFIX, TERMINAL_EVENTS
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/detect", tags=["detect"])
async def _event_generator(job_id: str) -> AsyncGenerator[str, None]:
cursor = 0
timeout = time.monotonic() + 3600 # 1 hour max (detection jobs are long)
while time.monotonic() < timeout:
events, cursor = poll_events(job_id, cursor, prefix=DETECT_EVENTS_PREFIX)
if not events:
yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n"
await asyncio.sleep(0.1)
continue
for data in events:
event_type = data.pop("event", "update")
payload = {**data, "job_id": job_id}
yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
if event_type in TERMINAL_EVENTS:
yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n"
return
await asyncio.sleep(0.05)
yield f"event: timeout\ndata: {json.dumps({'job_id': job_id})}\n\n"
@router.get("/stream/{job_id}")
async def stream_detect_job(job_id: str):
"""
SSE stream for a detection pipeline job.
The UI connects via native EventSource:
const es = new EventSource('/api/detect/stream/<job_id>');
es.addEventListener('graph_update', (e) => { ... });
es.addEventListener('detection', (e) => { ... });
"""
return StreamingResponse(
_event_generator(job_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)

View File

@@ -24,6 +24,7 @@ from fastapi.middleware.cors import CORSMiddleware
from strawberry.fastapi import GraphQLRouter
from core.api.chunker_sse import router as chunker_router
from core.api.detect_sse import router as detect_router
from core.api.graphql import schema as graphql_schema
CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "")
@@ -52,6 +53,9 @@ app.include_router(graphql_router, prefix="/graphql")
# Chunker SSE
app.include_router(chunker_router)
# Detection SSE
app.include_router(detect_router)
@app.get("/")
def root():

View File

@@ -17,19 +17,23 @@ def _get_redis():
return redis.from_url(REDIS_URL, decode_responses=True)
def push_event(job_id: str, event_type: str, data: dict) -> None:
def push_event(
job_id: str, event_type: str, data: dict, prefix: str = "chunk_events"
) -> None:
"""Push an event to the Redis list for a job."""
r = _get_redis()
key = f"chunk_events:{job_id}"
key = f"{prefix}:{job_id}"
event = json.dumps({"event": event_type, **data})
r.rpush(key, event)
r.expire(key, 3600)
def poll_events(job_id: str, cursor: int = 0) -> tuple[list[dict], int]:
def poll_events(
job_id: str, cursor: int = 0, prefix: str = "chunk_events"
) -> tuple[list[dict], int]:
"""Poll new events from Redis. Returns (events, new_cursor)."""
r = _get_redis()
key = f"chunk_events:{job_id}"
key = f"{prefix}:{job_id}"
raw_events = r.lrange(key, cursor, -1)
parsed = []
for raw in raw_events:

View File

@@ -20,6 +20,16 @@
"target": "protobuf",
"output": "core/rpc/protos/worker.proto",
"include": ["grpc"]
},
{
"target": "pydantic",
"output": "detect/sse_contract.py",
"include": ["detect_views"]
},
{
"target": "typescript",
"output": "ui/detection-app/src/types/sse-contract.ts",
"include": ["detect_views"]
}
]
}

View File

@@ -28,6 +28,7 @@ from .grpc import (
from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob
from .media import AssetStatus, MediaAsset
from .presets import BUILTIN_PRESETS, TranscodePreset
from .detect import DETECT_VIEWS # noqa: F401 — discovered by modelgen generic loader
from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent
# Core domain models - generates Django, Pydantic, TypeScript
@@ -50,6 +51,7 @@ ENUMS = [AssetStatus, JobStatus, ChunkJobStatus]
# View/event models - generates TypeScript for UI consumption
VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile]
# gRPC messages - generates Proto
GRPC_MESSAGES = [
JobRequest,

View File

@@ -0,0 +1,166 @@
"""
Detection Pipeline Schema Definitions
Source of truth for all detection SSE events and wire-format models.
Generates: Pydantic (detect/sse_contract.py), TypeScript (ui/detection-app/src/types/sse-contract.ts)
Pipeline-internal models that never cross the wire (e.g. Frame with np.ndarray)
live in detect/models.py and are NOT generated.
"""
from dataclasses import dataclass, field
from typing import List, Literal, Optional
# --- Enums as Literal unions (wire format, not Python Enum) ---
NodeStatus = Literal["idle", "processing", "completed", "error"]
DetectionSource = Literal["ocr", "local_vlm", "cloud_llm", "logo_match", "auxiliary"]
LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
# --- Nested components ---
@dataclass
class GraphNode:
"""A pipeline stage node."""
id: str
status: str = "idle" # NodeStatus
items_in: int = 0
items_out: int = 0
@dataclass
class GraphEdge:
"""An edge between pipeline stages."""
source: str
target: str
throughput: int = 0
@dataclass
class BoundingBoxEvent:
"""Bounding box in SSE event payloads."""
x: int
y: int
w: int
h: int
confidence: float
label: str
resolved_brand: Optional[str] = None
source: Optional[str] = None
@dataclass
class BrandSummary:
"""Per-brand stats in the final report."""
brand: str
total_appearances: int = 0
total_screen_time: float = 0.0
avg_confidence: float = 0.0
first_seen: float = 0.0
last_seen: float = 0.0
# --- SSE event payloads ---
@dataclass
class GraphUpdate:
"""Pipeline node state transition. SSE event: graph_update"""
nodes: List[GraphNode] = field(default_factory=list)
edges: List[GraphEdge] = field(default_factory=list)
active_path: List[str] = field(default_factory=list)
@dataclass
class StatsUpdate:
"""Funnel statistics snapshot. SSE event: stats_update"""
frames_extracted: int = 0
frames_after_scene_filter: int = 0
regions_detected: int = 0
regions_resolved_by_ocr: int = 0
regions_escalated_to_local_vlm: int = 0
regions_escalated_to_cloud_llm: int = 0
cloud_llm_calls: int = 0
processing_time_seconds: float = 0.0
estimated_cloud_cost_usd: float = 0.0
@dataclass
class FrameUpdate:
"""Current frame being processed. SSE event: frame_update"""
frame_ref: int
timestamp: float
jpeg_b64: str
boxes: List[BoundingBoxEvent] = field(default_factory=list)
@dataclass
class Detection:
"""A confirmed brand detection. SSE event: detection"""
brand: str
timestamp: float
duration: float
confidence: float
source: str # DetectionSource
content_type: str
bbox: Optional[BoundingBoxEvent] = None
frame_ref: Optional[int] = None
@dataclass
class LogEvent:
"""Pipeline log line. SSE event: log"""
level: str # LogLevel
stage: str
msg: str
ts: str
trace_id: Optional[str] = None
@dataclass
class DetectionReportSummary:
"""Final detection report summary."""
video_source: str
content_type: str
duration_seconds: float
total_detections: int = 0
brands: List[BrandSummary] = field(default_factory=list)
stats: Optional[StatsUpdate] = None
@dataclass
class JobComplete:
"""Final report when pipeline finishes. SSE event: job_complete"""
job_id: str
report: Optional[DetectionReportSummary] = None
# --- Export lists for modelgen ---
DETECT_VIEWS = [
GraphNode,
GraphEdge,
BoundingBoxEvent,
BrandSummary,
GraphUpdate,
StatsUpdate,
FrameUpdate,
Detection,
LogEvent,
DetectionReportSummary,
JobComplete,
]