# Chunker Pipeline — Execution Path ## Overview The chunker pipeline splits a media file into time-based segments using FFmpeg stream-copy. Events flow from worker threads through Redis and gRPC-Web streaming to the browser UI in real time. **7 hops from worker thread to pixel:** ``` Worker thread → Pipeline._emit() → event_bridge() → Redis RPUSH → [50ms poll] gRPC server LRANGE → yield protobuf → HTTP/2 frame → Envoy (grpc-web filter) → HTTP/1.1 chunk → nginx (proxy_buffering off) → fetch ReadableStream → protobuf-ts decode → setEvents([...prev, evt]) → React re-render ``` --- ## Step 1: Job Creation (Browser → GraphQL → Celery) ``` User clicks "Start" → App.tsx: handleStart(config) → api.ts: createChunkJob(config) → POST /graphql (nginx :80 → fastapi:8702) → graphql.py: Mutation.create_chunk_job() → core.db: creates ChunkJob row in Postgres → Celery: run_job.delay(job_type="chunk", job_id=..., payload=...) → Returns { id, celery_task_id } to browser → App.tsx: setJobId(id) — triggers gRPC stream subscription ``` **Files:** `ui/chunker/src/api.ts`, `core/api/graphql.py`, `core/jobs/task.py` --- ## Step 2: gRPC-Web Stream (Browser → nginx → Envoy → gRPC Server) Once `jobId` is set, `useGrpcStream(jobId)` opens a server-streaming RPC: ``` useGrpcStream(jobId) fires useEffect → GrpcWebFetchTransport({ baseUrl: "/grpc-web" }) → WorkerServiceClient.streamChunkPipeline({ jobId }) → fetch() POST to /grpc-web/worker.WorkerService/StreamChunkPipeline → nginx :80 /grpc-web/ (proxy_pass → envoy:8090, proxy_buffering off) → Envoy :8090 (grpc_web filter: HTTP/1.1 grpc-web → HTTP/2 native gRPC) → gRPC server :50051 WorkerServicer.StreamChunkPipeline() → Enters Redis polling loop (Step 5) ``` **Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ctrl/nginx.conf`, `ctrl/envoy.yaml`, `core/rpc/server.py` **Key nginx config:** `proxy_buffering off` is critical — without it, nginx collects the entire upstream response before forwarding, defeating streaming entirely. --- ## Step 3: Celery Worker → ChunkHandler ``` Celery picks up run_job task → task.py: run_job(job_type="chunk", job_id, payload) → registry.get_handler("chunk") → ChunkHandler → chunk.py: ChunkHandler.process(job_id, payload) → download_to_temp(BUCKET_IN, source_key) — pulls source from MinIO/S3 → Creates output_dir: /app/media/out/chunks/{job_id}/ → Constructs event_bridge callback (bridges Pipeline events → Redis) → pipeline = Pipeline(source, ..., event_callback=event_bridge, output_dir=...) → pipeline.run() ``` **Files:** `core/jobs/task.py`, `core/jobs/handlers/chunk.py` The `event_bridge` closure wraps every `Pipeline._emit()` call, forwarding to `push_event(job_id, event_type, data)` which writes to Redis. --- ## Step 4: Pipeline Orchestration (inside Celery worker process) `Pipeline.run()` spawns multiple threads: ``` pipeline.run(): │ ├─ Chunker(source, chunk_duration) │ → ffprobe source file → gets duration, file_size │ → calculates total_chunks = ceil(duration / chunk_duration) │ ├─ _emit("pipeline_start", {...}) → event_bridge → Redis ├─ _emit("pipeline_info", {file_size, duration, total_chunks}) → Redis │ ├─ Creates ChunkQueue(maxsize=10) ├─ Creates WorkerPool(num_workers=N, chunk_queue, processor, event_callback) │ ├─ pool.start() — spawns N worker threads │ ├─ MONITOR THREAD starts (_monitor_progress) │ → Every 500ms: _emit("pipeline_progress", {elapsed, throughput_mbps}) → Redis │ ├─ PRODUCER THREAD starts (_produce_chunks) │ → Iterates chunker.chunks() → yields Chunk(sequence, start_time, end_time) │ → For each: chunk_queue.put(chunk) │ → _emit("chunk_queued", {sequence, start_time, end_time, queue_size}) → Redis │ → chunk_queue.close() when done (sends N sentinel Nones) │ ├─ WORKER THREADS (N concurrent, each runs worker.py:Worker.run()) │ │ Each worker loops: │ │ │ ├─ chunk = chunk_queue.get(timeout=1.0) │ ├─ _emit("chunk_processing", {sequence, state:"processing", queue_size}) → Redis │ │ │ ├─ processor.process(chunk) │ │ ├─ ffmpeg: runs `ffmpeg -ss start -to end -c copy chunk_NNNN.mp4` │ │ ├─ simulated_decode: sleep(random) + checksum │ │ └─ checksum: reads bytes, computes hash │ │ │ ├─ On success: _emit("chunk_done", {sequence, processing_time, retries, queue_size}) → Redis │ ├─ On failure: retries with exponential backoff (0.1s, 0.2s, 0.4s...) │ │ └─ _emit("chunk_retry", {sequence, attempt, backoff}) → Redis │ │ └─ _emit("chunk_error", {sequence, error, retries}) → Redis (after exhaustion) │ │ │ └─ On sentinel (None): _emit("worker_status", {state:"stopped"}) → Redis │ ├─ pool.wait() — joins all worker threads, collects results ├─ monitor_stop.set() — stops progress monitor │ ├─ ResultCollector — reassembles results in sequence order │ └─ _emit("chunk_collected", {sequence, buffered, emitted}) → Redis │ ├─ Writes manifest.json to output_dir │ └─ _emit("pipeline_complete", {total_chunks, processed, failed, elapsed, throughput}) → Redis ``` **Files:** `core/chunker/pipeline.py`, `core/chunker/worker.py`, `core/chunker/pool.py`, `core/chunker/chunker.py`, `core/chunker/collector.py` --- ## Step 5: Redis — the Event Bus ``` WRITE side (Celery worker, all threads): push_event(job_id, event_type, data) → json.dumps({"event": event_type, ...data}) → Redis RPUSH to key "chunk_events:{job_id}" → Redis EXPIRE 3600 (1 hour TTL) READ side (gRPC server, StreamChunkPipeline): poll_events(job_id, cursor) → Redis LRANGE "chunk_events:{job_id}" cursor -1 → Returns (parsed_events, new_cursor) → Called every 50ms (time.sleep(0.05) in server loop) ``` Redis acts as a decoupling layer between the Celery worker process (which runs the pipeline) and the gRPC server process (which streams to browsers). Events are appended with RPUSH and read with cursor-based LRANGE polling. **Files:** `core/events.py` --- ## Step 6: gRPC Server → Envoy → nginx → Browser ``` server.py: StreamChunkPipeline polling loop: while context.is_active(): events, cursor = poll_events(job_id, cursor) ← Redis LRANGE for data in events: yield worker_pb2.ChunkPipelineEvent( ← serialized protobuf message job_id, event_type, sequence, worker_id, state, queue_size, elapsed, throughput_mbps, total_chunks, processed_chunks, failed_chunks, error, processing_time, retries ) if event_type in ("pipeline_complete", "pipeline_error"): return ← ends the stream time.sleep(0.05) ← 50ms poll interval Each yield sends: → gRPC HTTP/2 DATA frame to Envoy → Envoy grpc_web filter: HTTP/2 → base64-encoded grpc-web-text → nginx proxy_pass (proxy_buffering off) → chunked HTTP/1.1 to browser → fetch() ReadableStream in GrpcWebFetchTransport → @protobuf-ts decodes protobuf → ChunkPipelineEvent TypeScript object ``` **Files:** `core/rpc/server.py`, `ctrl/envoy.yaml`, `ctrl/nginx.conf`, `ui/common/api/grpc/worker.ts`, `ui/common/api/grpc/worker.client.ts` --- ## Step 7: React State Derivation and Rendering ``` useGrpcStream.ts: for await (const msg of stream.responses): const evt = toEvent(msg) ← maps protobuf camelCase → snake_case PipelineEvent setEvents(prev => [...prev, evt]) ← appends to events array if pipeline_complete/error → setDone(true), break App.tsx useMemo(events): Iterates ALL events on every update, derives: ├─ chunkMap: Map — state machine per chunk │ pending → queued → processing → done/error/retry ├─ workerMap: Map — state per worker │ idle → processing → idle → ... → stopped ├─ stats: PipelineStats │ total_chunks, processed, failed, retries, elapsed, throughput_mbps, queue_size ├─ errors: ErrorEntry[] — every event containing an error field └─ queueSize: number — last seen queue_size value Renders: ├─ ChunkGrid — colored cells per chunk (pending/queued/processing/done/error) ├─ QueueGauge — current queue depth / max ├─ WorkerPanel — per-worker state + current chunk assignment ├─ StatsPanel — elapsed time, throughput, processed/failed counts ├─ ErrorLog — scrollable error list └─ OutputFiles — download links (when done) ``` **Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ui/chunker/src/App.tsx` --- ## Step 8: Output File Access (after pipeline completes) ``` App.tsx useEffect([done, jobId]): → api.ts: getChunkOutputFiles(jobId) → POST /graphql → graphql.py: chunk_output_files(job_id) → Reads /app/media/out/chunks/{job_id}/ directory listing from disk → Returns [{key, size, url: "/media/out/chunks/{job_id}/chunk_0001.mp4"}] → Browser renders download links → Click link → nginx /media/out/ → alias /app/media/out/ → serves file from disk ``` Chunks are written directly to `media/out/chunks/{job_id}/` by the ffmpeg processor — no MinIO upload needed for output. Nginx serves them with `autoindex on`. **Files:** `core/api/graphql.py`, `core/jobs/handlers/chunk.py`, `ctrl/nginx.conf` --- ## Event Types Reference | Event | Source | Key Fields | |-------|--------|------------| | `pipeline_start` | Pipeline.run() | source, chunk_duration, num_workers, processor_type | | `pipeline_info` | Pipeline.run() | file_size, source_duration, total_chunks | | `pipeline_progress` | Monitor thread (500ms) | elapsed, throughput_mbps | | `chunk_queued` | Producer thread | sequence, start_time, end_time, duration, queue_size | | `chunk_processing` | Worker thread | sequence, worker_id, state, queue_size | | `chunk_done` | Worker thread | sequence, processing_time, retries, queue_size | | `chunk_retry` | Worker thread | sequence, attempt, backoff | | `chunk_error` | Worker thread | sequence, error, retries | | `chunk_collected` | ResultCollector | sequence, buffered, emitted | | `worker_status` | Worker thread | worker_id, state (idle/processing/stopped) | | `pipeline_complete` | Pipeline.run() | total_chunks, processed, failed, elapsed, throughput_mbps | | `pipeline_error` | Pipeline.run() | error | --- ## Thread Model (inside Celery worker) ``` Celery worker process └─ run_job task thread └─ Pipeline.run() ├─ Producer thread — enqueues chunks ├─ Monitor thread — emits progress every 500ms ├─ Worker thread 0 — pulls from queue, processes ├─ Worker thread 1 — pulls from queue, processes ├─ Worker thread 2 — pulls from queue, processes └─ Worker thread 3 — pulls from queue, processes ``` All threads share the same `event_callback` → `event_bridge` → `push_event()`, which creates a new Redis connection per call. Thread-safe via Redis atomic RPUSH. --- ## Infrastructure | Service | Port | Role | |---------|------|------| | nginx | 80 | Reverse proxy, static file serving | | fastapi | 8702 | GraphQL API (Strawberry) | | celery | — | Task worker (runs pipeline) | | redis | 6379 | Event bus + Celery broker | | grpc | 50051 | gRPC server (StreamChunkPipeline) | | envoy | 8090 | gRPC-Web ↔ native gRPC translation | | minio | 9000 | S3-compatible source media storage | | postgres | 5432 | Job/asset metadata |