diff --git a/admin/mpr/celery.py b/admin/mpr/celery.py index 93c2776..358c62e 100644 --- a/admin/mpr/celery.py +++ b/admin/mpr/celery.py @@ -7,4 +7,4 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "admin.mpr.settings") app = Celery("mpr") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() -app.autodiscover_tasks(["core.task"]) +app.autodiscover_tasks(["core.jobs"]) diff --git a/admin/mpr/media_assets/models.py b/admin/mpr/media_assets/models.py index 2ee9e29..6576093 100644 --- a/admin/mpr/media_assets/models.py +++ b/admin/mpr/media_assets/models.py @@ -19,6 +19,15 @@ class JobStatus(models.TextChoices): FAILED = "failed", "Failed" CANCELLED = "cancelled", "Cancelled" +class ChunkJobStatus(models.TextChoices): + PENDING = "pending", "Pending" + CHUNKING = "chunking", "Chunking" + PROCESSING = "processing", "Processing" + COLLECTING = "collecting", "Collecting" + COMPLETED = "completed", "Completed" + FAILED = "failed", "Failed" + CANCELLED = "cancelled", "Cancelled" + class MediaAsset(models.Model): """A video/audio file registered in the system.""" @@ -108,3 +117,34 @@ class TranscodeJob(models.Model): def __str__(self): return str(self.id) + +class ChunkJob(models.Model): + """A chunk pipeline job — splits a media file into chunks and processes them""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + source_asset_id = models.UUIDField() + chunk_duration = models.FloatField(default=10.0) + num_workers = models.IntegerField(default=4) + max_retries = models.IntegerField(default=3) + processor_type = models.CharField(max_length=255) + status = models.CharField(max_length=20, choices=ChunkJobStatus.choices, default=ChunkJobStatus.PENDING) + progress = models.FloatField(default=0.0) + total_chunks = models.IntegerField(default=0) + processed_chunks = models.IntegerField(default=0) + failed_chunks = models.IntegerField(default=0) + retry_count = models.IntegerField(default=0) + error_message = models.TextField(blank=True, default='') + throughput_mbps = models.FloatField(null=True, blank=True, default=None) + elapsed_seconds = models.FloatField(null=True, blank=True, default=None) + celery_task_id = models.CharField(max_length=255, null=True, blank=True) + priority = models.IntegerField(default=0) + created_at = models.DateTimeField(auto_now_add=True) + started_at = models.DateTimeField(null=True, blank=True) + completed_at = models.DateTimeField(null=True, blank=True) + + class Meta: + ordering = ["-created_at"] + + def __str__(self): + return str(self.id) + diff --git a/core/api/chunker_sse.py b/core/api/chunker_sse.py new file mode 100644 index 0000000..2958e11 --- /dev/null +++ b/core/api/chunker_sse.py @@ -0,0 +1,73 @@ +""" +SSE endpoint for chunker pipeline events. + +Uses Redis as the event bus between Celery workers and the SSE stream. +Celery worker pushes events via core.events, SSE endpoint polls them. + +GET /chunker/stream/{job_id} → text/event-stream +""" + +import asyncio +import json +import logging +import time +from typing import AsyncGenerator + +from fastapi import APIRouter +from starlette.responses import StreamingResponse + +from core.events import poll_events + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/chunker", tags=["chunker"]) + + +async def _event_generator(job_id: str) -> AsyncGenerator[str, None]: + """ + Generate SSE events by polling Redis for chunk job events. + """ + cursor = 0 + timeout = time.monotonic() + 600 # 10 min max + + while time.monotonic() < timeout: + events, cursor = poll_events(job_id, cursor) + + if not events: + yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n" + await asyncio.sleep(0.1) + continue + + for data in events: + event_type = data.pop("event", "update") + payload = {**data, "job_id": job_id} + + yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n" + + if event_type in ("pipeline_complete", "pipeline_error", "cancelled"): + yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n" + return + + await asyncio.sleep(0.05) + + yield f"event: timeout\ndata: {json.dumps({'job_id': job_id})}\n\n" + + +@router.get("/stream/{job_id}") +async def stream_chunk_job(job_id: str): + """ + SSE stream for a chunk pipeline job. + + The UI connects via native EventSource: + const es = new EventSource('/api/chunker/stream/'); + es.addEventListener('processing', (e) => { ... }); + """ + return StreamingResponse( + _event_generator(job_id), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/core/api/graphql.py b/core/api/graphql.py index 7a88e8f..6cd3b41 100644 --- a/core/api/graphql.py +++ b/core/api/graphql.py @@ -15,6 +15,10 @@ from strawberry.schema.config import StrawberryConfig from strawberry.types import Info from core.api.schema.graphql import ( + CancelResultType, + ChunkJobType, + ChunkOutputFileType, + CreateChunkJobInput, CreateJobInput, DeleteResultType, MediaAssetType, @@ -24,7 +28,7 @@ from core.api.schema.graphql import ( TranscodePresetType, UpdateAssetInput, ) -from core.storage import BUCKET_IN, list_objects +from core.storage import BUCKET_IN, list_objects, upload_file VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"} AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} @@ -88,6 +92,25 @@ class Query: def system_status(self, info: Info) -> SystemStatusType: return SystemStatusType(status="ok", version="0.1.0") + @strawberry.field + def chunk_output_files(self, info: Info, job_id: str) -> List[ChunkOutputFileType]: + """List output chunk files for a completed job from media/out/.""" + from pathlib import Path + + media_out = os.environ.get("MEDIA_OUT_DIR", "/app/media/out") + output_dir = Path(media_out) / "chunks" / job_id + if not output_dir.is_dir(): + return [] + return [ + ChunkOutputFileType( + key=f.name, + size=f.stat().st_size, + url=f"/media/out/chunks/{job_id}/{f.name}", + ) + for f in sorted(output_dir.iterdir()) + if f.is_file() + ] + # --------------------------------------------------------------------------- # Mutations @@ -98,8 +121,26 @@ class Query: class Mutation: @strawberry.mutation def scan_media_folder(self, info: Info) -> ScanResultType: + import logging + from pathlib import Path + from core.db import create_asset, get_asset_filenames + logger = logging.getLogger(__name__) + + # Sync local media/in/ files to MinIO (handles fresh installs / pruned volumes) + local_media = Path("/app/media/in") + if local_media.is_dir(): + existing_keys = {o["key"] for o in list_objects(BUCKET_IN)} + for f in local_media.iterdir(): + if f.is_file() and f.suffix.lower() in MEDIA_EXTS: + if f.name not in existing_keys: + try: + upload_file(str(f), BUCKET_IN, f.name) + logger.info("Uploaded %s to MinIO", f.name) + except Exception as e: + logger.warning("Failed to upload %s: %s", f.name, e) + objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS) existing = get_asset_filenames() @@ -172,30 +213,31 @@ class Mutation: priority=input.priority or 0, ) + payload = { + "source_key": source.file_path, + "output_key": output_filename, + "preset": preset_snapshot or None, + "trim_start": input.trim_start, + "trim_end": input.trim_end, + "duration": source.duration, + } + executor_mode = os.environ.get("MPR_EXECUTOR", "local") if executor_mode in ("lambda", "gcp"): - from core.task.executor import get_executor + from core.jobs.executor import get_executor get_executor().run( + job_type="transcode", job_id=str(job.id), - source_path=source.file_path, - output_path=output_filename, - preset=preset_snapshot or None, - trim_start=input.trim_start, - trim_end=input.trim_end, - duration=source.duration, + payload=payload, ) else: - from core.task.tasks import run_transcode_job + from core.jobs.task import run_job - result = run_transcode_job.delay( + result = run_job.delay( + job_type="transcode", job_id=str(job.id), - source_key=source.file_path, - output_key=output_filename, - preset=preset_snapshot or None, - trim_start=input.trim_start, - trim_end=input.trim_end, - duration=source.duration, + payload=payload, ) job.celery_task_id = result.id job.save(update_fields=["celery_task_id"]) @@ -261,6 +303,75 @@ class Mutation: except Exception: raise Exception("Asset not found") + @strawberry.mutation + def create_chunk_job(self, info: Info, input: CreateChunkJobInput) -> ChunkJobType: + """Create and dispatch a chunk pipeline job.""" + import uuid + + from core.db import get_asset + + try: + source = get_asset(input.source_asset_id) + except Exception: + raise Exception("Source asset not found") + + job_id = str(uuid.uuid4()) + + payload = { + "source_key": source.file_path, + "chunk_duration": input.chunk_duration, + "num_workers": input.num_workers, + "max_retries": input.max_retries, + "processor_type": input.processor_type, + "start_time": input.start_time, + "end_time": input.end_time, + } + + executor_mode = os.environ.get("MPR_EXECUTOR", "local") + celery_task_id = None + + if executor_mode in ("lambda", "gcp"): + from core.jobs.executor import get_executor + + get_executor().run( + job_type="chunk", + job_id=job_id, + payload=payload, + ) + else: + from core.jobs.task import run_job + + result = run_job.delay( + job_type="chunk", + job_id=job_id, + payload=payload, + ) + celery_task_id = result.id + + return ChunkJobType( + id=uuid.UUID(job_id), + source_asset_id=input.source_asset_id, + chunk_duration=input.chunk_duration, + num_workers=input.num_workers, + max_retries=input.max_retries, + processor_type=input.processor_type, + status="pending", + progress=0.0, + priority=input.priority, + celery_task_id=celery_task_id, + ) + + @strawberry.mutation + def cancel_chunk_job(self, info: Info, celery_task_id: str) -> CancelResultType: + """Cancel a running chunk job by revoking its Celery task.""" + try: + from admin.mpr.celery import app as celery_app + + celery_app.control.revoke(celery_task_id, terminate=True, signal="SIGTERM") + return CancelResultType(ok=True, message="Task revoked") + except Exception as e: + return CancelResultType(ok=False, message=str(e)) + # --------------------------------------------------------------------------- # Schema diff --git a/core/api/main.py b/core/api/main.py index 6844a69..8d62cf8 100644 --- a/core/api/main.py +++ b/core/api/main.py @@ -23,6 +23,7 @@ from fastapi import FastAPI, Header, HTTPException from fastapi.middleware.cors import CORSMiddleware from strawberry.fastapi import GraphQLRouter +from core.api.chunker_sse import router as chunker_router from core.api.graphql import schema as graphql_schema CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "") @@ -48,6 +49,9 @@ app.add_middleware( graphql_router = GraphQLRouter(schema=graphql_schema, graphql_ide="graphiql") app.include_router(graphql_router, prefix="/graphql") +# Chunker SSE +app.include_router(chunker_router) + @app.get("/") def root(): diff --git a/core/api/schema/graphql.py b/core/api/schema/graphql.py index 48e2eb4..cf4c9a3 100644 --- a/core/api/schema/graphql.py +++ b/core/api/schema/graphql.py @@ -37,7 +37,7 @@ class MediaAssetType: file_path: Optional[str] = None status: Optional[str] = None error_message: Optional[str] = None - file_size: Optional[int] = None + file_size: Optional[float] = None duration: Optional[float] = None video_codec: Optional[str] = None audio_codec: Optional[str] = None @@ -156,3 +156,71 @@ class WorkerStatusType: active_jobs: Optional[int] = None supported_codecs: Optional[List[str]] = None gpu_available: Optional[bool] = None + + +@strawberry.enum +class ChunkJobStatus(Enum): + PENDING = "pending" + CHUNKING = "chunking" + PROCESSING = "processing" + COLLECTING = "collecting" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@strawberry.type +class ChunkJobType: + """A chunk pipeline job.""" + + id: Optional[UUID] = None + source_asset_id: Optional[UUID] = None + chunk_duration: Optional[float] = None + num_workers: Optional[int] = None + max_retries: Optional[int] = None + processor_type: Optional[str] = None + status: Optional[str] = None + progress: Optional[float] = None + total_chunks: Optional[int] = None + processed_chunks: Optional[int] = None + failed_chunks: Optional[int] = None + retry_count: Optional[int] = None + error_message: Optional[str] = None + throughput_mbps: Optional[float] = None + elapsed_seconds: Optional[float] = None + celery_task_id: Optional[str] = None + priority: Optional[int] = None + created_at: Optional[datetime] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +@strawberry.input +class CreateChunkJobInput: + """Request body for creating a chunk pipeline job.""" + + source_asset_id: UUID + chunk_duration: float = 10.0 + num_workers: int = 4 + max_retries: int = 3 + processor_type: str = "ffmpeg" + priority: int = 0 + start_time: Optional[float] = None + end_time: Optional[float] = None + + +@strawberry.type +class CancelResultType: + """Result of cancelling a chunk job.""" + + ok: bool = False + message: Optional[str] = None + + +@strawberry.type +class ChunkOutputFileType: + """A chunk output file in S3/MinIO with presigned download URL.""" + + key: str + size: int = 0 + url: str = "" diff --git a/core/chunker/__init__.py b/core/chunker/__init__.py new file mode 100644 index 0000000..81effee --- /dev/null +++ b/core/chunker/__init__.py @@ -0,0 +1,64 @@ +""" +Chunker pipeline — splits files into chunks, processes concurrently, reassembles in order. + +Public API: + Pipeline — orchestrates the full pipeline + PipelineResult — aggregate result dataclass + Chunker — file → Chunk generator + ChunkQueue — bounded thread-safe queue + WorkerPool — manages N worker threads + ResultCollector — heapq-based ordered reassembly +""" + +from .chunker import Chunker +from .collector import ResultCollector +from .exceptions import ( + ChunkChecksumError, + ChunkError, + ChunkReadError, + PipelineError, + ProcessingError, + ProcessorFailureError, + ProcessorTimeoutError, + ReassemblyError, +) +from .models import Chunk, ChunkResult, PipelineResult +from .pipeline import Pipeline +from .pool import WorkerPool +from .processor import ( + ChecksumProcessor, + CompositeProcessor, + FFmpegExtractProcessor, + Processor, + SimulatedDecodeProcessor, +) +from .queue import ChunkQueue + +__all__ = [ + # Core + "Pipeline", + "PipelineResult", + # Components + "Chunker", + "ChunkQueue", + "WorkerPool", + "ResultCollector", + # Models + "Chunk", + "ChunkResult", + # Processors + "Processor", + "ChecksumProcessor", + "SimulatedDecodeProcessor", + "CompositeProcessor", + "FFmpegExtractProcessor", + # Exceptions + "PipelineError", + "ChunkError", + "ChunkReadError", + "ChunkChecksumError", + "ProcessingError", + "ProcessorFailureError", + "ProcessorTimeoutError", + "ReassemblyError", +] diff --git a/core/chunker/chunker.py b/core/chunker/chunker.py new file mode 100644 index 0000000..53a2eb0 --- /dev/null +++ b/core/chunker/chunker.py @@ -0,0 +1,101 @@ +""" +Chunker — probes a media file and yields time-based Chunk objects. + +Demonstrates: +- Function parameters and defaults (Interview Topic 1) +- List comprehensions and efficient iteration / generators (Interview Topic 3) +""" + +import math +import os +from typing import Generator + +from core.ffmpeg.probe import probe_file + +from .exceptions import ChunkReadError +from .models import Chunk + + +class Chunker: + """ + Splits a media file into time-based chunks via a generator. + + Uses FFmpeg probe to get duration, then yields Chunk objects + representing time segments (no data read — extraction happens in the processor). + + Args: + file_path: Path to the source media file + chunk_duration: Duration of each chunk in seconds (default: 10.0) + """ + + def __init__( + self, + file_path: str, + chunk_duration: float = 10.0, + start_time: float | None = None, + end_time: float | None = None, + ): + if not os.path.isfile(file_path): + raise ChunkReadError(f"File not found: {file_path}") + if chunk_duration <= 0: + raise ValueError("chunk_duration must be positive") + + self.file_path = file_path + self.chunk_duration = chunk_duration + self.file_size = os.path.getsize(file_path) + full_duration = self._probe_duration() + + # Apply time range + self.range_start = max(start_time or 0.0, 0.0) + self.range_end = min(end_time or full_duration, full_duration) + if self.range_start >= self.range_end: + raise ValueError( + f"Invalid range: start={self.range_start} >= end={self.range_end}" + ) + self.source_duration = self.range_end - self.range_start + + def _probe_duration(self) -> float: + """Get source file duration via FFmpeg probe.""" + try: + result = probe_file(self.file_path) + if result.duration is None or result.duration <= 0: + raise ChunkReadError( + f"Cannot determine duration for {self.file_path}" + ) + return result.duration + except ChunkReadError: + raise + except Exception as e: + raise ChunkReadError( + f"Failed to probe {self.file_path}: {e}" + ) from e + + @property + def expected_chunks(self) -> int: + """Calculate expected number of chunks (last chunk may be shorter).""" + if self.source_duration <= 0: + return 0 + return math.ceil(self.source_duration / self.chunk_duration) + + def chunks(self) -> Generator[Chunk, None, None]: + """ + Yield Chunk objects representing time segments of the source file. + + Generator-based: chunks are yielded on demand. + Each chunk defines a time range — actual extraction is done by the processor. + """ + total = self.expected_chunks + for sequence in range(total): + start_time = self.range_start + sequence * self.chunk_duration + end_time = min( + start_time + self.chunk_duration, self.range_end + ) + duration = end_time - start_time + + yield Chunk( + sequence=sequence, + start_time=start_time, + end_time=end_time, + source_path=self.file_path, + duration=duration, + ) diff --git a/core/chunker/collector.py b/core/chunker/collector.py new file mode 100644 index 0000000..4e4b5fd --- /dev/null +++ b/core/chunker/collector.py @@ -0,0 +1,98 @@ +""" +ResultCollector — reassembles chunk results in sequence order using a min-heap. + +Demonstrates: +- Algorithms and sorting (Interview Topic 6) — heapq for ordered reassembly +- Core data structures (Interview Topic 5) — heap, deque +""" + +import heapq +from collections import deque +from typing import List + +from .exceptions import ReassemblyError +from .models import ChunkResult + + +class ResultCollector: + """ + Receives ChunkResults out of order, emits them in sequence order. + + Uses a min-heap keyed on sequence number. Only emits a chunk when + all prior sequences have been accounted for. + + Args: + total_chunks: Expected total number of chunks + """ + + def __init__(self, total_chunks: int): + self.total_chunks = total_chunks + self._heap: List[tuple[int, ChunkResult]] = [] + self._next_sequence = 0 + self._emitted: List[ChunkResult] = [] + self._seen_sequences: set[int] = set() + # Sliding window for throughput calculation + self._recent_times: deque[float] = deque(maxlen=50) + + def add(self, result: ChunkResult) -> List[ChunkResult]: + """ + Add a result and return any newly emittable results in order. + + Args: + result: A ChunkResult (may arrive out of order) + + Returns: + List of results that can now be emitted in sequence order + (may be empty if we're still waiting for earlier sequences) + + Raises: + ReassemblyError: If a duplicate sequence is received + """ + if result.sequence in self._seen_sequences: + raise ReassemblyError( + f"Duplicate sequence number: {result.sequence}" + ) + self._seen_sequences.add(result.sequence) + + # Track processing time for throughput + if result.processing_time > 0: + self._recent_times.append(result.processing_time) + + # Push to min-heap + heapq.heappush(self._heap, (result.sequence, result)) + + # Emit all consecutive results starting from _next_sequence + newly_emitted = [] + while self._heap and self._heap[0][0] == self._next_sequence: + _, emitted_result = heapq.heappop(self._heap) + self._emitted.append(emitted_result) + newly_emitted.append(emitted_result) + self._next_sequence += 1 + + return newly_emitted + + @property + def is_complete(self) -> bool: + """True if all expected chunks have been emitted in order.""" + return self._next_sequence == self.total_chunks + + @property + def buffered_count(self) -> int: + """Number of results waiting in the heap (arrived out of order).""" + return len(self._heap) + + @property + def emitted_count(self) -> int: + """Number of results emitted in sequence order.""" + return len(self._emitted) + + @property + def avg_processing_time(self) -> float: + """Average processing time from recent results (sliding window).""" + if not self._recent_times: + return 0.0 + return sum(self._recent_times) / len(self._recent_times) + + def get_ordered_results(self) -> List[ChunkResult]: + """Get all emitted results in sequence order.""" + return list(self._emitted) diff --git a/core/chunker/exceptions.py b/core/chunker/exceptions.py new file mode 100644 index 0000000..2426152 --- /dev/null +++ b/core/chunker/exceptions.py @@ -0,0 +1,64 @@ +""" +Chunker exception hierarchy. + +Demonstrates: Managing exceptions and writing resilient code (Interview Topic 7). +""" + + +class PipelineError(Exception): + """Base exception for all chunker pipeline errors.""" + pass + + +class ChunkError(PipelineError): + """Errors related to chunk creation or validation.""" + pass + + +class ChunkReadError(ChunkError): + """Failed to read chunk data from source file.""" + pass + + +class ChunkChecksumError(ChunkError): + """Chunk data integrity validation failed.""" + + def __init__(self, sequence: int, expected: str, actual: str): + self.sequence = sequence + self.expected = expected + self.actual = actual + super().__init__( + f"Chunk {sequence}: checksum mismatch " + f"(expected={expected}, actual={actual})" + ) + + +class ProcessingError(PipelineError): + """Errors during chunk processing by workers.""" + pass + + +class ProcessorTimeoutError(ProcessingError): + """Processor exceeded allowed time for a chunk.""" + + def __init__(self, sequence: int, timeout: float): + self.sequence = sequence + self.timeout = timeout + super().__init__(f"Chunk {sequence}: processor timed out after {timeout}s") + + +class ProcessorFailureError(ProcessingError): + """Processor failed to process a chunk after all retries.""" + + def __init__(self, sequence: int, retries: int, original_error: Exception): + self.sequence = sequence + self.retries = retries + self.original_error = original_error + super().__init__( + f"Chunk {sequence}: failed after {retries} retries — {original_error}" + ) + + +class ReassemblyError(PipelineError): + """Errors during result collection and ordering.""" + pass diff --git a/core/chunker/models.py b/core/chunker/models.py new file mode 100644 index 0000000..d2f6a7d --- /dev/null +++ b/core/chunker/models.py @@ -0,0 +1,54 @@ +""" +Internal data models for the chunker pipeline. + +These are pipeline-internal dataclasses, not schema models. +Schema-level ChunkJob is in core/schema/models/jobs.py. + +Demonstrates: Core data structures (Interview Topic 5). +""" + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class Chunk: + """A time-based segment of the source media file.""" + + sequence: int + start_time: float # seconds + end_time: float # seconds + source_path: str # path to source file + duration: float # end_time - start_time + checksum: str = "" # computed after extraction + + +@dataclass +class ChunkResult: + """Result of processing a single chunk.""" + + sequence: int + success: bool + checksum_valid: bool = True + processing_time: float = 0.0 + error: Optional[str] = None + retries: int = 0 + worker_id: Optional[str] = None + output_file: Optional[str] = None + + +@dataclass +class PipelineResult: + """Aggregate result of the entire pipeline run.""" + + total_chunks: int = 0 + processed: int = 0 + failed: int = 0 + retries: int = 0 + elapsed_time: float = 0.0 + throughput_mbps: float = 0.0 + worker_stats: Dict[str, Any] = field(default_factory=dict) + errors: List[str] = field(default_factory=list) + chunks_in_order: bool = True + output_dir: Optional[str] = None + chunk_files: List[str] = field(default_factory=list) diff --git a/core/chunker/pipeline.py b/core/chunker/pipeline.py new file mode 100644 index 0000000..975c249 --- /dev/null +++ b/core/chunker/pipeline.py @@ -0,0 +1,279 @@ +""" +Pipeline — orchestrates the entire chunker pipeline. + +Wires: Chunker → ChunkQueue → WorkerPool → ResultCollector → PipelineResult + +Demonstrates: +- Function parameters and defaults (Interview Topic 1) — configurable pipeline +- Concurrency (Interview Topic 2) — producer thread + worker pool +- OOP design (Interview Topic 4) — composition of pipeline components +- Exception handling (Interview Topic 7) — graceful error propagation +""" + +import json +import logging +import threading +import time +from pathlib import Path +from typing import Any, Callable, Dict, Optional + +from .chunker import Chunker +from .collector import ResultCollector +from .exceptions import PipelineError +from .models import PipelineResult +from .pool import WorkerPool +from .queue import ChunkQueue + +logger = logging.getLogger(__name__) + + +class Pipeline: + """ + Orchestrates the chunk processing pipeline. + + The pipeline runs in three stages: + 1. Producer thread: Chunker probes file → pushes time-based chunks to ChunkQueue + 2. Worker pool: N workers pull from queue → extract mp4 segments → emit results + 3. Collector: ResultCollector reassembles results in sequence order + + Args: + source: Path to the source media file + chunk_duration: Duration of each chunk in seconds (default: 10.0) + num_workers: Number of concurrent worker threads (default: 4) + max_retries: Max retry attempts per chunk (default: 3) + processor_type: Processor to use — "ffmpeg", "checksum", "simulated_decode", "composite" + queue_size: Max chunks buffered in queue (default: 10) + event_callback: Optional callback for real-time events + output_dir: Directory for output chunk files (required for "ffmpeg" processor) + """ + + def __init__( + self, + source: str, + chunk_duration: float = 10.0, + num_workers: int = 4, + max_retries: int = 3, + processor_type: str = "checksum", + queue_size: int = 10, + event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None, + output_dir: Optional[str] = None, + start_time: Optional[float] = None, + end_time: Optional[float] = None, + ): + self.source = source + self.chunk_duration = chunk_duration + self.num_workers = num_workers + self.max_retries = max_retries + self.processor_type = processor_type + self.queue_size = queue_size + self.event_callback = event_callback + self.output_dir = output_dir + self.start_time = start_time + self.end_time = end_time + + def _emit(self, event_type: str, data: Dict[str, Any]) -> None: + """Emit an event if callback is registered.""" + if self.event_callback: + self.event_callback(event_type, data) + + def _produce_chunks( + self, chunker: Chunker, chunk_queue: ChunkQueue + ) -> None: + """Producer thread: probe file and enqueue time-based chunks.""" + try: + for chunk in chunker.chunks(): + chunk_queue.put(chunk, timeout=30.0) + self._emit("chunk_queued", { + "sequence": chunk.sequence, + "start_time": chunk.start_time, + "end_time": chunk.end_time, + "duration": chunk.duration, + "queue_size": chunk_queue.qsize(), + }) + except Exception as e: + logger.error(f"Producer error: {e}") + self._emit("producer_error", {"error": str(e)}) + finally: + chunk_queue.close() + + def _monitor_progress( + self, start_time: float, file_size: int, stop_event: threading.Event + ) -> None: + """Monitor thread: emit pipeline_progress every 500ms.""" + while not stop_event.is_set(): + elapsed = time.monotonic() - start_time + mb = file_size / (1024 * 1024) + self._emit("pipeline_progress", { + "elapsed": round(elapsed, 2), + "throughput_mbps": round(mb / elapsed, 2) if elapsed > 0 else 0, + }) + stop_event.wait(0.5) + + def _write_manifest( + self, result: PipelineResult, source_duration: float + ) -> None: + """Write manifest.json to output_dir with segment metadata.""" + if not self.output_dir: + return + + manifest = { + "source": self.source, + "source_duration": source_duration, + "chunk_duration": self.chunk_duration, + "total_chunks": result.total_chunks, + "processed": result.processed, + "failed": result.failed, + "elapsed_time": result.elapsed_time, + "throughput_mbps": result.throughput_mbps, + "segments": [ + { + "sequence": i, + "file": f"chunk_{i:04d}.mp4", + "start": i * self.chunk_duration, + "end": min( + (i + 1) * self.chunk_duration, source_duration + ), + } + for i in range(result.total_chunks) + if i < result.total_chunks + ], + } + + manifest_path = Path(self.output_dir) / "manifest.json" + manifest_path.write_text(json.dumps(manifest, indent=2)) + logger.info(f"Manifest written to {manifest_path}") + + def run(self) -> PipelineResult: + """ + Execute the full pipeline. + + Returns: + PipelineResult with aggregate stats + + Raises: + PipelineError: If the pipeline fails catastrophically + """ + start_time = time.monotonic() + self._emit("pipeline_start", { + "source": self.source, + "chunk_duration": self.chunk_duration, + "num_workers": self.num_workers, + "processor_type": self.processor_type, + }) + + try: + # Stage 1: Set up chunker (probes file for duration) + chunker = Chunker( + self.source, + self.chunk_duration, + start_time=self.start_time, + end_time=self.end_time, + ) + total_chunks = chunker.expected_chunks + + if total_chunks == 0: + self._emit("pipeline_complete", {"total_chunks": 0}) + return PipelineResult(chunks_in_order=True) + + self._emit("pipeline_info", { + "file_size": chunker.file_size, + "source_duration": chunker.source_duration, + "total_chunks": total_chunks, + }) + + # Stage 2: Set up queue and worker pool + chunk_queue = ChunkQueue(maxsize=self.queue_size) + pool = WorkerPool( + num_workers=self.num_workers, + chunk_queue=chunk_queue, + processor_type=self.processor_type, + max_retries=self.max_retries, + event_callback=self.event_callback, + output_dir=self.output_dir, + ) + + # Stage 3: Start workers, monitor, then produce chunks + pool.start() + + monitor_stop = threading.Event() + monitor = threading.Thread( + target=self._monitor_progress, + args=(start_time, chunker.file_size, monitor_stop), + name="progress-monitor", + daemon=True, + ) + monitor.start() + + producer = threading.Thread( + target=self._produce_chunks, + args=(chunker, chunk_queue), + name="chunk-producer", + daemon=True, + ) + producer.start() + + # Stage 4: Wait for all workers to finish + all_results = pool.wait() + producer.join(timeout=5.0) + + # Stop monitor + monitor_stop.set() + monitor.join(timeout=2.0) + + # Stage 5: Collect results in order + collector = ResultCollector(total_chunks) + for r in all_results: + collector.add(r) + self._emit("chunk_collected", { + "sequence": r.sequence, + "success": r.success, + "buffered": collector.buffered_count, + "emitted": collector.emitted_count, + }) + + # Build result + elapsed = time.monotonic() - start_time + file_size_mb = chunker.file_size / (1024 * 1024) + throughput = file_size_mb / elapsed if elapsed > 0 else 0.0 + + failed_results = [r for r in all_results if not r.success] + total_retries = sum(r.retries for r in all_results) + chunk_files = [ + r.output_file for r in all_results + if r.success and r.output_file + ] + + result = PipelineResult( + total_chunks=total_chunks, + processed=len(all_results), + failed=len(failed_results), + retries=total_retries, + elapsed_time=elapsed, + throughput_mbps=throughput, + worker_stats=pool.get_worker_stats(), + errors=[r.error for r in failed_results if r.error], + chunks_in_order=collector.is_complete, + output_dir=self.output_dir, + chunk_files=chunk_files, + ) + + # Write manifest if output_dir is set + self._write_manifest(result, chunker.source_duration) + + pool.shutdown() + + self._emit("pipeline_complete", { + "total_chunks": result.total_chunks, + "processed": result.processed, + "failed": result.failed, + "elapsed": result.elapsed_time, + "throughput_mbps": result.throughput_mbps, + }) + + return result + + except PipelineError: + raise + except Exception as e: + self._emit("pipeline_error", {"error": str(e)}) + raise PipelineError(f"Pipeline failed: {e}") from e diff --git a/core/chunker/pool.py b/core/chunker/pool.py new file mode 100644 index 0000000..bc86d04 --- /dev/null +++ b/core/chunker/pool.py @@ -0,0 +1,125 @@ +""" +WorkerPool — manages N worker threads via ThreadPoolExecutor. + +Demonstrates: Python concurrency — threading (Interview Topic 2). +""" + +import logging +import threading +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Callable, Dict, List, Optional + +from .models import ChunkResult +from .processor import ( + ChecksumProcessor, + CompositeProcessor, + FFmpegExtractProcessor, + Processor, + SimulatedDecodeProcessor, +) +from .queue import ChunkQueue +from .worker import Worker + +logger = logging.getLogger(__name__) + + +def create_processor( + processor_type: str = "checksum", + output_dir: Optional[str] = None, +) -> Processor: + """Factory for processor instances.""" + if processor_type == "ffmpeg": + if not output_dir: + raise ValueError("output_dir required for ffmpeg processor") + return FFmpegExtractProcessor(output_dir=output_dir) + elif processor_type == "checksum": + return ChecksumProcessor() + elif processor_type == "simulated_decode": + return SimulatedDecodeProcessor() + elif processor_type == "composite": + return CompositeProcessor([ + ChecksumProcessor(), + SimulatedDecodeProcessor(ms_per_second=50.0), + ]) + else: + raise ValueError(f"Unknown processor type: {processor_type}") + + +class WorkerPool: + """ + Manages N worker threads that process chunks concurrently. + + Args: + num_workers: Number of concurrent worker threads (default: 4) + chunk_queue: Shared queue to pull chunks from + processor_type: Type of processor for each worker (default: "checksum") + max_retries: Max retry attempts per chunk (default: 3) + event_callback: Optional callback for real-time events + """ + + def __init__( + self, + num_workers: int = 4, + chunk_queue: Optional[ChunkQueue] = None, + processor_type: str = "checksum", + max_retries: int = 3, + event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None, + output_dir: Optional[str] = None, + ): + self.num_workers = num_workers + self.chunk_queue = chunk_queue or ChunkQueue() + self.processor_type = processor_type + self.max_retries = max_retries + self.event_callback = event_callback + self.output_dir = output_dir + self.shutdown_event = threading.Event() + self._executor: Optional[ThreadPoolExecutor] = None + self._futures: List[Future] = [] + self._workers: List[Worker] = [] + + def start(self) -> None: + """Start all worker threads.""" + self._executor = ThreadPoolExecutor( + max_workers=self.num_workers, + thread_name_prefix="chunk-worker", + ) + + for i in range(self.num_workers): + worker = Worker( + worker_id=f"worker-{i}", + chunk_queue=self.chunk_queue, + processor=create_processor(self.processor_type, output_dir=self.output_dir), + max_retries=self.max_retries, + event_callback=self.event_callback, + ) + self._workers.append(worker) + future = self._executor.submit(worker.run) + self._futures.append(future) + + logger.info(f"WorkerPool started with {self.num_workers} workers") + + def wait(self) -> List[ChunkResult]: + """Wait for all workers to finish and collect results.""" + all_results = [] + for future in self._futures: + results = future.result() + all_results.extend(results) + return all_results + + def shutdown(self) -> None: + """Signal shutdown and cleanup.""" + self.shutdown_event.set() + self.chunk_queue.close() + if self._executor: + self._executor.shutdown(wait=True) + + def get_worker_stats(self) -> Dict[str, Any]: + """Get per-worker statistics.""" + return { + w.worker_id: { + "processed": w.processed_count, + "errors": w.error_count, + "retries": w.retry_count, + } + for w in self._workers + } diff --git a/core/chunker/processor.py b/core/chunker/processor.py new file mode 100644 index 0000000..dd5d772 --- /dev/null +++ b/core/chunker/processor.py @@ -0,0 +1,173 @@ +""" +Processor ABC and concrete implementations. + +Demonstrates: OOP design principles — ABC, inheritance, composition (Interview Topic 4). +""" + +import hashlib +import time +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List + +from .exceptions import ChunkChecksumError +from .models import Chunk, ChunkResult + + +class Processor(ABC): + """ + Abstract base class for chunk processors. + + Each processor defines how a single chunk is processed. + The Worker calls processor.process(chunk) and handles retries. + """ + + @abstractmethod + def process(self, chunk: Chunk) -> ChunkResult: + """Process a single chunk and return the result.""" + pass + + +class FFmpegExtractProcessor(Processor): + """ + Extracts a time segment from the source file using FFmpeg stream copy. + + Produces a playable mp4 file per chunk — no re-encoding. + + Args: + output_dir: Directory to write chunk mp4 files + """ + + def __init__(self, output_dir: str): + self.output_dir = output_dir + Path(output_dir).mkdir(parents=True, exist_ok=True) + + def process(self, chunk: Chunk) -> ChunkResult: + from core.ffmpeg.transcode import TranscodeConfig, transcode + + start = time.monotonic() + + output_file = str( + Path(self.output_dir) / f"chunk_{chunk.sequence:04d}.mp4" + ) + + config = TranscodeConfig( + input_path=chunk.source_path, + output_path=output_file, + video_codec="copy", + audio_codec="copy", + trim_start=chunk.start_time, + trim_end=chunk.end_time, + ) + + transcode(config) + + # Compute checksum of output file + md5 = hashlib.md5() + with open(output_file, "rb") as f: + for block in iter(lambda: f.read(8192), b""): + md5.update(block) + checksum = md5.hexdigest() + + elapsed = time.monotonic() - start + + return ChunkResult( + sequence=chunk.sequence, + success=True, + checksum_valid=True, + processing_time=elapsed, + output_file=output_file, + ) + + +class ChecksumProcessor(Processor): + """ + Validates chunk metadata consistency. + + For time-based chunks, verifies the time range is valid. + Raises ChunkChecksumError on invalid ranges. + """ + + def process(self, chunk: Chunk) -> ChunkResult: + start = time.monotonic() + + valid = chunk.duration > 0 and chunk.end_time > chunk.start_time + + if not valid: + raise ChunkChecksumError( + sequence=chunk.sequence, + expected="valid time range", + actual=f"{chunk.start_time}-{chunk.end_time}", + ) + + elapsed = time.monotonic() - start + + return ChunkResult( + sequence=chunk.sequence, + success=True, + checksum_valid=True, + processing_time=elapsed, + ) + + +class SimulatedDecodeProcessor(Processor): + """ + Simulates decode work by sleeping proportional to chunk duration. + + Useful for demonstrating concurrency behavior without real FFmpeg. + + Args: + ms_per_second: Milliseconds of simulated work per second of chunk duration (default: 100) + """ + + def __init__(self, ms_per_second: float = 100.0): + self.ms_per_second = ms_per_second + + def process(self, chunk: Chunk) -> ChunkResult: + start = time.monotonic() + + sleep_time = (self.ms_per_second * chunk.duration) / 1000.0 + time.sleep(sleep_time) + + elapsed = time.monotonic() - start + + return ChunkResult( + sequence=chunk.sequence, + success=True, + checksum_valid=True, + processing_time=elapsed, + ) + + +class CompositeProcessor(Processor): + """ + Chains multiple processors — runs each in sequence on the same chunk. + + Demonstrates OOP composition pattern. + + Args: + processors: List of processors to chain + """ + + def __init__(self, processors: List[Processor]): + if not processors: + raise ValueError("CompositeProcessor requires at least one processor") + self.processors = processors + + def process(self, chunk: Chunk) -> ChunkResult: + start = time.monotonic() + last_result = None + + for proc in self.processors: + last_result = proc.process(chunk) + if not last_result.success: + return last_result + + elapsed = time.monotonic() - start + + return ChunkResult( + sequence=chunk.sequence, + success=True, + checksum_valid=last_result.checksum_valid if last_result else True, + processing_time=elapsed, + ) diff --git a/core/chunker/queue.py b/core/chunker/queue.py new file mode 100644 index 0000000..191a219 --- /dev/null +++ b/core/chunker/queue.py @@ -0,0 +1,76 @@ +""" +ChunkQueue — bounded, thread-safe queue with sentinel-based shutdown. + +Demonstrates: Core data structures — queue.Queue (Interview Topic 5). +""" + +import queue +from typing import Optional + +from .models import Chunk + +# Sentinel value to signal workers to stop +_SENTINEL = object() + + +class ChunkQueue: + """ + Thread-safe bounded queue for chunks. + + Provides backpressure: producers block when the queue is full, + preventing unbounded memory usage. + + Args: + maxsize: Maximum number of chunks in the queue (default: 10) + """ + + def __init__(self, maxsize: int = 10): + self._queue: queue.Queue = queue.Queue(maxsize=maxsize) + self._closed = False + self.maxsize = maxsize + + def put(self, chunk: Chunk, timeout: Optional[float] = None) -> None: + """ + Add a chunk to the queue. Blocks if full (backpressure). + + Args: + chunk: The chunk to enqueue + timeout: Max seconds to wait (None = block forever) + + Raises: + queue.Full: If timeout expires while queue is full + """ + self._queue.put(chunk, timeout=timeout) + + def get(self, timeout: Optional[float] = None) -> Optional[Chunk]: + """ + Get next chunk from queue. Returns None if queue is closed. + + Args: + timeout: Max seconds to wait (None = block forever) + + Returns: + Chunk or None (if sentinel received, meaning queue is closed) + + Raises: + queue.Empty: If timeout expires while queue is empty + """ + item = self._queue.get(timeout=timeout) + if item is _SENTINEL: + # Re-put sentinel so other workers also see it + self._queue.put(_SENTINEL) + return None + return item + + def close(self) -> None: + """Signal all consumers to stop by inserting a sentinel.""" + self._closed = True + self._queue.put(_SENTINEL) + + @property + def is_closed(self) -> bool: + return self._closed + + def qsize(self) -> int: + """Current number of items in the queue (approximate).""" + return self._queue.qsize() diff --git a/core/chunker/worker.py b/core/chunker/worker.py new file mode 100644 index 0000000..de094ca --- /dev/null +++ b/core/chunker/worker.py @@ -0,0 +1,143 @@ +""" +Worker — pulls chunks from queue, processes with retry logic. + +Demonstrates: +- Exception handling and resilient code (Interview Topic 7) +- Concurrency (Interview Topic 2) — workers run in thread pool +""" + +import logging +import queue +import time +from typing import Any, Callable, Dict, Optional + +from .exceptions import ProcessorFailureError +from .models import Chunk, ChunkResult +from .processor import Processor +from .queue import ChunkQueue + +logger = logging.getLogger(__name__) + + +class Worker: + """ + Processes chunks from a queue with retry and exponential backoff. + + Args: + worker_id: Identifier for this worker (e.g. "worker-0") + chunk_queue: Source queue to pull chunks from + processor: Processor instance to use + max_retries: Maximum retry attempts per chunk (default: 3) + event_callback: Optional callback for real-time status updates + """ + + def __init__( + self, + worker_id: str, + chunk_queue: ChunkQueue, + processor: Processor, + max_retries: int = 3, + event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None, + ): + self.worker_id = worker_id + self.chunk_queue = chunk_queue + self.processor = processor + self.max_retries = max_retries + self.event_callback = event_callback + self.processed_count = 0 + self.error_count = 0 + self.retry_count = 0 + + def _emit(self, event_type: str, data: Dict[str, Any]) -> None: + """Emit an event if callback is registered.""" + if self.event_callback: + self.event_callback(event_type, {"worker_id": self.worker_id, **data}) + + def _process_with_retry(self, chunk: Chunk) -> ChunkResult: + """ + Process a chunk with exponential backoff retry. + + Retry delays: 0.1s, 0.2s, 0.4s, ... (doubles each attempt) + """ + last_error = None + + for attempt in range(self.max_retries + 1): + try: + if attempt > 0: + backoff = 0.1 * (2 ** (attempt - 1)) + self._emit("chunk_retry", { + "sequence": chunk.sequence, + "attempt": attempt, + "backoff": backoff, + }) + time.sleep(backoff) + self.retry_count += 1 + + result = self.processor.process(chunk) + result.retries = attempt + result.worker_id = self.worker_id + return result + + except Exception as e: + last_error = e + logger.warning( + f"{self.worker_id}: chunk {chunk.sequence} " + f"attempt {attempt + 1}/{self.max_retries + 1} failed: {e}" + ) + + # All retries exhausted + self.error_count += 1 + self._emit("chunk_error", { + "sequence": chunk.sequence, + "error": str(last_error), + "retries": self.max_retries, + }) + + return ChunkResult( + sequence=chunk.sequence, + success=False, + processing_time=0.0, + error=str(last_error), + retries=self.max_retries, + worker_id=self.worker_id, + ) + + def run(self) -> list[ChunkResult]: + """ + Main worker loop — pull chunks and process until queue is closed. + + Returns: + List of ChunkResults processed by this worker + """ + results = [] + self._emit("worker_status", {"state": "idle"}) + + while True: + try: + chunk = self.chunk_queue.get(timeout=1.0) + except queue.Empty: + continue + + if chunk is None: # Sentinel received + break + + self._emit("chunk_processing", { + "sequence": chunk.sequence, + "state": "processing", + "queue_size": self.chunk_queue.qsize(), + }) + + result = self._process_with_retry(chunk) + results.append(result) + self.processed_count += 1 + + self._emit("chunk_done", { + "sequence": chunk.sequence, + "success": result.success, + "processing_time": result.processing_time, + "retries": result.retries, + "queue_size": self.chunk_queue.qsize(), + }) + + self._emit("worker_status", {"state": "stopped"}) + return results diff --git a/core/events.py b/core/events.py new file mode 100644 index 0000000..5ea6980 --- /dev/null +++ b/core/events.py @@ -0,0 +1,40 @@ +""" +Redis-based event bus for pipeline job progress. + +Celery workers push events, SSE endpoints poll them. +Only depends on redis — safe to import from any context. +""" + +import json +import os + +import redis + +REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") + + +def _get_redis(): + return redis.from_url(REDIS_URL, decode_responses=True) + + +def push_event(job_id: str, event_type: str, data: dict) -> None: + """Push an event to the Redis list for a job.""" + r = _get_redis() + key = f"chunk_events:{job_id}" + event = json.dumps({"event": event_type, **data}) + r.rpush(key, event) + r.expire(key, 3600) + + +def poll_events(job_id: str, cursor: int = 0) -> tuple[list[dict], int]: + """Poll new events from Redis. Returns (events, new_cursor).""" + r = _get_redis() + key = f"chunk_events:{job_id}" + raw_events = r.lrange(key, cursor, -1) + parsed = [] + for raw in raw_events: + try: + parsed.append(json.loads(raw)) + except (json.JSONDecodeError, TypeError): + pass + return parsed, cursor + len(raw_events) diff --git a/core/jobs/__init__.py b/core/jobs/__init__.py new file mode 100644 index 0000000..8827db9 --- /dev/null +++ b/core/jobs/__init__.py @@ -0,0 +1,15 @@ +""" +MPR Jobs Module + +Provides executor abstraction and task dispatch for job processing. +""" + +from .executor import Executor, LocalExecutor, get_executor +from .task import run_job + +__all__ = [ + "Executor", + "LocalExecutor", + "get_executor", + "run_job", +] diff --git a/core/task/executor.py b/core/jobs/executor.py similarity index 53% rename from core/task/executor.py rename to core/jobs/executor.py index aadf783..ef1e6cd 100644 --- a/core/task/executor.py +++ b/core/jobs/executor.py @@ -1,17 +1,16 @@ """ Executor abstraction for job processing. -Supports different backends: -- LocalExecutor: FFmpeg via Celery (default) -- LambdaExecutor: AWS Lambda (future) +Determines WHERE jobs run: +- LocalExecutor: delegates to registered Handler (default) +- LambdaExecutor: AWS Step Functions +- GCPExecutor: Google Cloud Run Jobs """ import os from abc import ABC, abstractmethod from typing import Any, Callable, Dict, Optional -from core.ffmpeg.transcode import TranscodeConfig, transcode - # Configuration from environment MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local") @@ -22,26 +21,18 @@ class Executor(ABC): @abstractmethod def run( self, + job_type: str, job_id: str, - source_path: str, - output_path: str, - preset: Optional[Dict[str, Any]] = None, - trim_start: Optional[float] = None, - trim_end: Optional[float] = None, - duration: Optional[float] = None, + payload: Dict[str, Any], progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """ - Execute a transcode/trim job. + Execute a job. Args: + job_type: Type of job ("transcode", "chunk", etc.) job_id: Unique job identifier - source_path: Path to source file - output_path: Path for output file - preset: Transcode preset dict (optional, None = trim only) - trim_start: Trim start time in seconds (optional) - trim_end: Trim end time in seconds (optional) - duration: Source duration in seconds (for progress calculation) + payload: Job-type-specific configuration dict progress_callback: Called with (percent, details_dict) Returns: @@ -51,62 +42,25 @@ class Executor(ABC): class LocalExecutor(Executor): - """Execute jobs locally using FFmpeg.""" + """Execute jobs locally using registered handlers.""" def run( self, + job_type: str, job_id: str, - source_path: str, - output_path: str, - preset: Optional[Dict[str, Any]] = None, - trim_start: Optional[float] = None, - trim_end: Optional[float] = None, - duration: Optional[float] = None, + payload: Dict[str, Any], progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: - """Execute job using local FFmpeg.""" + """Execute job using the appropriate local handler.""" + from .registry import get_handler - # Build config from preset or use stream copy for trim-only - if preset: - config = TranscodeConfig( - input_path=source_path, - output_path=output_path, - video_codec=preset.get("video_codec", "libx264"), - video_bitrate=preset.get("video_bitrate"), - video_crf=preset.get("video_crf"), - video_preset=preset.get("video_preset"), - resolution=preset.get("resolution"), - framerate=preset.get("framerate"), - audio_codec=preset.get("audio_codec", "aac"), - audio_bitrate=preset.get("audio_bitrate"), - audio_channels=preset.get("audio_channels"), - audio_samplerate=preset.get("audio_samplerate"), - container=preset.get("container", "mp4"), - extra_args=preset.get("extra_args", []), - trim_start=trim_start, - trim_end=trim_end, - ) - else: - # Trim-only: stream copy - config = TranscodeConfig( - input_path=source_path, - output_path=output_path, - video_codec="copy", - audio_codec="copy", - trim_start=trim_start, - trim_end=trim_end, - ) - - # Wrapper to convert float percent to int - def wrapped_callback(percent: float, details: Dict[str, Any]) -> None: - if progress_callback: - progress_callback(int(percent), details) - - return transcode( - config, - duration=duration, - progress_callback=wrapped_callback if progress_callback else None, + handler = get_handler(job_type) + result = handler.process( + job_id=job_id, + payload=payload, + progress_callback=progress_callback, ) + return result.get("status") == "completed" class LambdaExecutor(Executor): @@ -123,26 +77,18 @@ class LambdaExecutor(Executor): def run( self, + job_type: str, job_id: str, - source_path: str, - output_path: str, - preset: Optional[Dict[str, Any]] = None, - trim_start: Optional[float] = None, - trim_end: Optional[float] = None, - duration: Optional[float] = None, + payload: Dict[str, Any], progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """Start a Step Functions execution for this job.""" import json - payload = { + sfn_payload = { + "job_type": job_type, "job_id": job_id, - "source_key": source_path, - "output_key": output_path, - "preset": preset, - "trim_start": trim_start, - "trim_end": trim_end, - "duration": duration, + **payload, "callback_url": self.callback_url, "api_key": self.callback_api_key, } @@ -150,10 +96,9 @@ class LambdaExecutor(Executor): response = self.sfn.start_execution( stateMachineArn=self.state_machine_arn, name=f"mpr-{job_id}", - input=json.dumps(payload), + input=json.dumps(sfn_payload), ) - # Store execution ARN on the job execution_arn = response["executionArn"] try: from core.db import update_job_fields @@ -179,13 +124,9 @@ class GCPExecutor(Executor): def run( self, + job_type: str, job_id: str, - source_path: str, - output_path: str, - preset: Optional[Dict[str, Any]] = None, - trim_start: Optional[float] = None, - trim_end: Optional[float] = None, - duration: Optional[float] = None, + payload: Dict[str, Any], progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, ) -> bool: """Trigger a Cloud Run Job execution for this job.""" @@ -193,14 +134,10 @@ class GCPExecutor(Executor): from google.cloud import run_v2 - payload = { + gcp_payload = { + "job_type": job_type, "job_id": job_id, - "source_key": source_path, - "output_key": output_path, - "preset": preset, - "trim_start": trim_start, - "trim_end": trim_end, - "duration": duration, + **payload, "callback_url": self.callback_url, "api_key": self.callback_api_key, } @@ -216,7 +153,8 @@ class GCPExecutor(Executor): run_v2.RunJobRequest.Overrides.ContainerOverride( env=[ run_v2.EnvVar( - name="MPR_JOB_PAYLOAD", value=json.dumps(payload) + name="MPR_JOB_PAYLOAD", + value=json.dumps(gcp_payload), ) ] ) diff --git a/core/task/gcp_handler.py b/core/jobs/gcp_handler.py similarity index 100% rename from core/task/gcp_handler.py rename to core/jobs/gcp_handler.py diff --git a/core/jobs/handlers/__init__.py b/core/jobs/handlers/__init__.py new file mode 100644 index 0000000..8ac89c6 --- /dev/null +++ b/core/jobs/handlers/__init__.py @@ -0,0 +1,5 @@ +"""Job handlers — type-specific execution logic.""" + +from .base import Handler + +__all__ = ["Handler"] diff --git a/core/jobs/handlers/base.py b/core/jobs/handlers/base.py new file mode 100644 index 0000000..f47328f --- /dev/null +++ b/core/jobs/handlers/base.py @@ -0,0 +1,33 @@ +""" +Base Handler ABC — defines the interface for job-type-specific execution logic. + +A Handler knows HOW to execute a specific kind of job (transcode, chunk, etc.). +The Executor decides WHERE to run it (local, Lambda, GCP). +""" + +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, Optional + + +class Handler(ABC): + """Abstract base class for job handlers.""" + + @abstractmethod + def process( + self, + job_id: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> Dict[str, Any]: + """ + Execute job-specific logic. + + Args: + job_id: Unique job identifier + payload: Job-type-specific configuration + progress_callback: Called with (percent, details_dict) + + Returns: + Result dict with at least {"status": "completed"} or raises + """ + pass diff --git a/core/jobs/handlers/chunk.py b/core/jobs/handlers/chunk.py new file mode 100644 index 0000000..352a5fe --- /dev/null +++ b/core/jobs/handlers/chunk.py @@ -0,0 +1,125 @@ +""" +ChunkHandler — job handler that wraps the chunker Pipeline. + +Downloads source from S3/MinIO, runs FFmpeg chunking pipeline, +writes mp4 segments + manifest to media/out/chunks/{job_id}/. +Pushes real-time events to Redis for SSE consumption. +""" + +import logging +import os +from typing import Any, Callable, Dict, Optional + +from core.events import push_event as push_chunk_event +from core.chunker import Pipeline +from core.storage import BUCKET_IN, download_to_temp + +from .base import Handler + +logger = logging.getLogger(__name__) + +MEDIA_OUT_DIR = os.environ.get("MEDIA_OUT_DIR", "/app/media/out") + + +class ChunkHandler(Handler): + """ + Handles chunk processing jobs by delegating to the chunker Pipeline. + + Expected payload keys: + source_key: str — S3 key of the source file in BUCKET_IN + chunk_duration: float — seconds per chunk (default: 10.0) + num_workers: int — concurrent workers (default: 4) + max_retries: int — retries per chunk (default: 3) + processor_type: str — "ffmpeg", "checksum", "simulated_decode", "composite" + queue_size: int — max queue depth (default: 10) + """ + + def process( + self, + job_id: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> Dict[str, Any]: + source_key = payload["source_key"] + processor_type = payload.get("processor_type", "ffmpeg") + + logger.info(f"ChunkHandler starting job {job_id}: {source_key}") + + # Download source from S3/MinIO + push_chunk_event(job_id, "pipeline_start", {"status": "downloading", "source_key": source_key}) + tmp_source = download_to_temp(BUCKET_IN, source_key) + + # Output directory: media/out/chunks/{job_id}/ + output_dir = os.path.join(MEDIA_OUT_DIR, "chunks", job_id) + if processor_type == "ffmpeg": + os.makedirs(output_dir, exist_ok=True) + + try: + def event_bridge(event_type: str, data: Dict[str, Any]) -> None: + """Bridge pipeline events to Redis + optional progress callback.""" + push_chunk_event(job_id, event_type, data) + + if progress_callback and event_type == "pipeline_complete": + progress_callback(100, data) + elif progress_callback and event_type == "chunk_done": + total = data.get("total_chunks", 1) + if total > 0: + pct = min(int((data.get("sequence", 0) + 1) / total * 100), 99) + progress_callback(pct, data) + + pipeline = Pipeline( + source=tmp_source, + chunk_duration=payload.get("chunk_duration", 10.0), + num_workers=payload.get("num_workers", 4), + max_retries=payload.get("max_retries", 3), + processor_type=processor_type, + queue_size=payload.get("queue_size", 10), + event_callback=event_bridge, + output_dir=output_dir if processor_type == "ffmpeg" else None, + start_time=payload.get("start_time"), + end_time=payload.get("end_time"), + ) + + result = pipeline.run() + + # Files are already in media/out/chunks/{job_id}/ + output_prefix = f"chunks/{job_id}" + output_files = [ + f"{output_prefix}/{os.path.basename(f)}" + for f in result.chunk_files + ] + + push_chunk_event(job_id, "pipeline_complete", { + "status": "completed", + "total_chunks": result.total_chunks, + "processed": result.processed, + "failed": result.failed, + "elapsed": result.elapsed_time, + "throughput_mbps": result.throughput_mbps, + }) + + return { + "status": "completed" if result.failed == 0 else "completed_with_errors", + "total_chunks": result.total_chunks, + "processed": result.processed, + "failed": result.failed, + "retries": result.retries, + "elapsed_time": result.elapsed_time, + "throughput_mbps": result.throughput_mbps, + "worker_stats": result.worker_stats, + "errors": result.errors, + "chunks_in_order": result.chunks_in_order, + "output_prefix": output_prefix, + "output_files": output_files, + } + + except Exception as e: + push_chunk_event(job_id, "pipeline_error", {"status": "failed", "error": str(e)}) + raise + + finally: + # Cleanup temp source file only (output dir is persistent) + try: + os.unlink(tmp_source) + except OSError: + pass diff --git a/core/jobs/handlers/transcode.py b/core/jobs/handlers/transcode.py new file mode 100644 index 0000000..6371e2f --- /dev/null +++ b/core/jobs/handlers/transcode.py @@ -0,0 +1,104 @@ +""" +TranscodeHandler — executes transcode/trim jobs using FFmpeg. + +Extracted from the old tasks.py Celery task logic. +""" + +import logging +import os +import tempfile +from pathlib import Path +from typing import Any, Callable, Dict, Optional + +from core.ffmpeg.transcode import TranscodeConfig, transcode +from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file + +from .base import Handler + +logger = logging.getLogger(__name__) + + +class TranscodeHandler(Handler): + """Handle transcode and trim jobs via FFmpeg.""" + + def process( + self, + job_id: str, + payload: Dict[str, Any], + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> Dict[str, Any]: + source_key = payload["source_key"] + output_key = payload["output_key"] + preset = payload.get("preset") + trim_start = payload.get("trim_start") + trim_end = payload.get("trim_end") + duration = payload.get("duration") + + logger.info(f"TranscodeHandler: {source_key} -> {output_key}") + + # Download source + tmp_source = download_to_temp(BUCKET_IN, source_key) + + ext = Path(output_key).suffix or ".mp4" + fd, tmp_output = tempfile.mkstemp(suffix=ext) + os.close(fd) + + try: + if preset: + config = TranscodeConfig( + input_path=tmp_source, + output_path=tmp_output, + video_codec=preset.get("video_codec", "libx264"), + video_bitrate=preset.get("video_bitrate"), + video_crf=preset.get("video_crf"), + video_preset=preset.get("video_preset"), + resolution=preset.get("resolution"), + framerate=preset.get("framerate"), + audio_codec=preset.get("audio_codec", "aac"), + audio_bitrate=preset.get("audio_bitrate"), + audio_channels=preset.get("audio_channels"), + audio_samplerate=preset.get("audio_samplerate"), + container=preset.get("container", "mp4"), + extra_args=preset.get("extra_args", []), + trim_start=trim_start, + trim_end=trim_end, + ) + else: + config = TranscodeConfig( + input_path=tmp_source, + output_path=tmp_output, + video_codec="copy", + audio_codec="copy", + trim_start=trim_start, + trim_end=trim_end, + ) + + def wrapped_callback(percent: float, details: Dict[str, Any]) -> None: + if progress_callback: + progress_callback(int(percent), details) + + success = transcode( + config, + duration=duration, + progress_callback=wrapped_callback if progress_callback else None, + ) + + if not success: + raise RuntimeError("Transcode returned False") + + # Upload result + logger.info(f"Uploading {output_key} to {BUCKET_OUT}") + upload_file(tmp_output, BUCKET_OUT, output_key) + + return { + "status": "completed", + "job_id": job_id, + "output_key": output_key, + } + + finally: + for f in [tmp_source, tmp_output]: + try: + os.unlink(f) + except OSError: + pass diff --git a/core/task/lambda_handler.py b/core/jobs/lambda_handler.py similarity index 100% rename from core/task/lambda_handler.py rename to core/jobs/lambda_handler.py diff --git a/core/jobs/registry.py b/core/jobs/registry.py new file mode 100644 index 0000000..6b9b4f7 --- /dev/null +++ b/core/jobs/registry.py @@ -0,0 +1,33 @@ +""" +Handler registry — maps job_type strings to Handler classes. +""" + +from typing import Dict, Type + +from .handlers.base import Handler + +_handlers: Dict[str, Type[Handler]] = {} + + +def register_handler(job_type: str, handler_class: Type[Handler]) -> None: + """Register a handler class for a job type.""" + _handlers[job_type] = handler_class + + +def get_handler(job_type: str) -> Handler: + """Get an instantiated handler for a job type.""" + if job_type not in _handlers: + raise ValueError(f"Unknown job type: {job_type}") + return _handlers[job_type]() + + +def _register_defaults() -> None: + """Register built-in handlers.""" + from .handlers.chunk import ChunkHandler + from .handlers.transcode import TranscodeHandler + + register_handler("transcode", TranscodeHandler) + register_handler("chunk", ChunkHandler) + + +_register_defaults() diff --git a/core/jobs/task.py b/core/jobs/task.py new file mode 100644 index 0000000..4c0a60d --- /dev/null +++ b/core/jobs/task.py @@ -0,0 +1,64 @@ +""" +Celery task for job processing. + +Generic dispatcher — routes to the appropriate handler based on job_type. +""" + +import logging +from typing import Any, Dict + +from celery import shared_task + +from core.rpc.server import update_job_progress + +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, max_retries=3, default_retry_delay=60) +def run_job( + self, + job_type: str, + job_id: str, + payload: Dict[str, Any], +) -> Dict[str, Any]: + """ + Generic Celery task — dispatches to the registered handler for job_type. + """ + logger.info(f"Starting {job_type} job {job_id}") + + update_job_progress(job_id, progress=0, status="processing") + + def progress_callback(percent: int, details: Dict[str, Any]) -> None: + update_job_progress( + job_id, + progress=percent, + current_time=details.get("time", 0.0), + status="processing", + ) + + try: + from .registry import get_handler + + handler = get_handler(job_type) + result = handler.process( + job_id=job_id, + payload=payload, + progress_callback=progress_callback, + ) + + logger.info(f"Job {job_id} completed successfully") + update_job_progress(job_id, progress=100, status="completed") + return result + + except Exception as e: + logger.exception(f"Job {job_id} failed: {e}") + update_job_progress(job_id, progress=0, status="failed", error=str(e)) + + if self.request.retries < self.max_retries: + raise self.retry(exc=e) + + return { + "status": "failed", + "job_id": job_id, + "error": str(e), + } diff --git a/core/rpc/protos/worker.proto b/core/rpc/protos/worker.proto index 9114563..c78714a 100644 --- a/core/rpc/protos/worker.proto +++ b/core/rpc/protos/worker.proto @@ -11,6 +11,7 @@ service WorkerService { rpc StreamProgress(ProgressRequest) returns (stream ProgressUpdate); rpc CancelJob(CancelRequest) returns (CancelResponse); rpc GetWorkerStatus(Empty) returns (WorkerStatus); + rpc StreamChunkPipeline(ChunkStreamRequest) returns (stream ChunkPipelineEvent); } message JobRequest { @@ -62,3 +63,24 @@ message WorkerStatus { message Empty { // Empty } + +message ChunkStreamRequest { + string job_id = 1; +} + +message ChunkPipelineEvent { + string job_id = 1; + string event_type = 2; + int32 sequence = 3; + string worker_id = 4; + string state = 5; + int32 queue_size = 6; + float elapsed = 7; + float throughput_mbps = 8; + int32 total_chunks = 9; + int32 processed_chunks = 10; + int32 failed_chunks = 11; + string error = 12; + float processing_time = 13; + int32 retries = 14; +} diff --git a/core/rpc/server.py b/core/rpc/server.py index f4ae778..5d412da 100644 --- a/core/rpc/server.py +++ b/core/rpc/server.py @@ -59,17 +59,24 @@ class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer): # Dispatch to Celery if available if self.celery_app: - from core.task.tasks import run_transcode_job + from core.jobs.task import run_job - task = run_transcode_job.delay( - job_id=job_id, - source_path=request.source_path, - output_path=request.output_path, - preset=preset, - trim_start=request.trim_start + payload = { + "source_key": request.source_path, + "output_key": request.output_path, + "preset": preset, + "trim_start": request.trim_start if request.HasField("trim_start") else None, - trim_end=request.trim_end if request.HasField("trim_end") else None, + "trim_end": request.trim_end + if request.HasField("trim_end") + else None, + } + + task = run_job.delay( + job_type="transcode", + job_id=job_id, + payload=payload, ) _active_jobs[job_id]["celery_task_id"] = task.id @@ -166,6 +173,43 @@ class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer): message="Job not found", ) + def StreamChunkPipeline(self, request, context) -> Iterator[worker_pb2.ChunkPipelineEvent]: + """Stream chunk pipeline events for a job.""" + from core.events import poll_events + + job_id = request.job_id + logger.info(f"StreamChunkPipeline: {job_id}") + + cursor = 0 + timeout = time.monotonic() + 600 # 10 min max + + while context.is_active() and time.monotonic() < timeout: + events, cursor = poll_events(job_id, cursor) + + for data in events: + event_type = data.pop("event", "") + yield worker_pb2.ChunkPipelineEvent( + job_id=job_id, + event_type=event_type, + sequence=data.get("sequence", 0), + worker_id=data.get("worker_id", ""), + state=data.get("state", ""), + queue_size=data.get("queue_size", 0), + elapsed=data.get("elapsed", 0.0), + throughput_mbps=data.get("throughput_mbps", 0.0), + total_chunks=data.get("total_chunks", 0), + processed_chunks=data.get("processed_chunks", 0), + failed_chunks=data.get("failed_chunks", 0), + error=data.get("error", ""), + processing_time=data.get("processing_time", 0.0), + retries=data.get("retries", 0), + ) + + if event_type in ("pipeline_complete", "pipeline_error"): + return + + time.sleep(0.05) + def GetWorkerStatus(self, request, context): """Get worker health and capabilities.""" try: @@ -197,11 +241,14 @@ def update_job_progress( speed: float = 0.0, status: str = "processing", error: str = None, + **extra, ) -> None: """ Update job progress (called from worker tasks). Updates both the in-memory gRPC state and the Django database. + Extra kwargs are stored for chunker-specific fields (total_chunks, + processed_chunks, failed_chunks, throughput_mbps, etc.). """ if job_id in _active_jobs: _active_jobs[job_id].update( @@ -212,6 +259,7 @@ def update_job_progress( "speed": speed, "status": status, "error": error, + **extra, } ) diff --git a/core/rpc/worker_pb2.py b/core/rpc/worker_pb2.py index 80e125c..0ddf905 100644 --- a/core/rpc/worker_pb2.py +++ b/core/rpc/worker_pb2.py @@ -24,7 +24,7 @@ _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cworker.proto\x12\nmpr.worker\"\xa7\x01\n\nJobRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x13\n\x0bsource_path\x18\x02 \x01(\t\x12\x13\n\x0boutput_path\x18\x03 \x01(\t\x12\x13\n\x0bpreset_json\x18\x04 \x01(\t\x12\x17\n\ntrim_start\x18\x05 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08trim_end\x18\x06 \x01(\x02H\x01\x88\x01\x01\x42\r\n\x0b_trim_startB\x0b\n\t_trim_end\"@\n\x0bJobResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08\x61\x63\x63\x65pted\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"!\n\x0fProgressRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\x9c\x01\n\x0eProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08progress\x18\x02 \x01(\x05\x12\x15\n\rcurrent_frame\x18\x03 \x01(\x05\x12\x14\n\x0c\x63urrent_time\x18\x04 \x01(\x02\x12\r\n\x05speed\x18\x05 \x01(\x02\x12\x0e\n\x06status\x18\x06 \x01(\t\x12\x12\n\x05\x65rror\x18\x07 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x1f\n\rCancelRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"D\n\x0e\x43\x61ncelResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x11\n\tcancelled\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"g\n\x0cWorkerStatus\x12\x11\n\tavailable\x18\x01 \x01(\x08\x12\x13\n\x0b\x61\x63tive_jobs\x18\x02 \x01(\x05\x12\x18\n\x10supported_codecs\x18\x03 \x03(\t\x12\x15\n\rgpu_available\x18\x04 \x01(\x08\"\x07\n\x05\x45mpty2\x9e\x02\n\rWorkerService\x12<\n\tSubmitJob\x12\x16.mpr.worker.JobRequest\x1a\x17.mpr.worker.JobResponse\x12K\n\x0eStreamProgress\x12\x1b.mpr.worker.ProgressRequest\x1a\x1a.mpr.worker.ProgressUpdate0\x01\x12\x42\n\tCancelJob\x12\x19.mpr.worker.CancelRequest\x1a\x1a.mpr.worker.CancelResponse\x12>\n\x0fGetWorkerStatus\x12\x11.mpr.worker.Empty\x1a\x18.mpr.worker.WorkerStatusb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cworker.proto\x12\nmpr.worker\"\xa7\x01\n\nJobRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x13\n\x0bsource_path\x18\x02 \x01(\t\x12\x13\n\x0boutput_path\x18\x03 \x01(\t\x12\x13\n\x0bpreset_json\x18\x04 \x01(\t\x12\x17\n\ntrim_start\x18\x05 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08trim_end\x18\x06 \x01(\x02H\x01\x88\x01\x01\x42\r\n\x0b_trim_startB\x0b\n\t_trim_end\"@\n\x0bJobResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08\x61\x63\x63\x65pted\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"!\n\x0fProgressRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\x9c\x01\n\x0eProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08progress\x18\x02 \x01(\x05\x12\x15\n\rcurrent_frame\x18\x03 \x01(\x05\x12\x14\n\x0c\x63urrent_time\x18\x04 \x01(\x02\x12\r\n\x05speed\x18\x05 \x01(\x02\x12\x0e\n\x06status\x18\x06 \x01(\t\x12\x12\n\x05\x65rror\x18\x07 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x1f\n\rCancelRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"D\n\x0e\x43\x61ncelResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x11\n\tcancelled\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"g\n\x0cWorkerStatus\x12\x11\n\tavailable\x18\x01 \x01(\x08\x12\x13\n\x0b\x61\x63tive_jobs\x18\x02 \x01(\x05\x12\x18\n\x10supported_codecs\x18\x03 \x03(\t\x12\x15\n\rgpu_available\x18\x04 \x01(\x08\"\x07\n\x05\x45mpty\"$\n\x12\x43hunkStreamRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\xaa\x02\n\x12\x43hunkPipelineEvent\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x12\n\nevent_type\x18\x02 \x01(\t\x12\x10\n\x08sequence\x18\x03 \x01(\x05\x12\x11\n\tworker_id\x18\x04 \x01(\t\x12\r\n\x05state\x18\x05 \x01(\t\x12\x12\n\nqueue_size\x18\x06 \x01(\x05\x12\x0f\n\x07\x65lapsed\x18\x07 \x01(\x02\x12\x17\n\x0fthroughput_mbps\x18\x08 \x01(\x02\x12\x14\n\x0ctotal_chunks\x18\t \x01(\x05\x12\x18\n\x10processed_chunks\x18\n \x01(\x05\x12\x15\n\rfailed_chunks\x18\x0b \x01(\x05\x12\r\n\x05\x65rror\x18\x0c \x01(\t\x12\x17\n\x0fprocessing_time\x18\r \x01(\x02\x12\x0f\n\x07retries\x18\x0e \x01(\x05\x32\xf7\x02\n\rWorkerService\x12<\n\tSubmitJob\x12\x16.mpr.worker.JobRequest\x1a\x17.mpr.worker.JobResponse\x12K\n\x0eStreamProgress\x12\x1b.mpr.worker.ProgressRequest\x1a\x1a.mpr.worker.ProgressUpdate0\x01\x12\x42\n\tCancelJob\x12\x19.mpr.worker.CancelRequest\x1a\x1a.mpr.worker.CancelResponse\x12>\n\x0fGetWorkerStatus\x12\x11.mpr.worker.Empty\x1a\x18.mpr.worker.WorkerStatus\x12W\n\x13StreamChunkPipeline\x12\x1e.mpr.worker.ChunkStreamRequest\x1a\x1e.mpr.worker.ChunkPipelineEvent0\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -47,6 +47,10 @@ if not _descriptor._USE_C_DESCRIPTORS: _globals['_WORKERSTATUS']._serialized_end=664 _globals['_EMPTY']._serialized_start=666 _globals['_EMPTY']._serialized_end=673 - _globals['_WORKERSERVICE']._serialized_start=676 - _globals['_WORKERSERVICE']._serialized_end=962 + _globals['_CHUNKSTREAMREQUEST']._serialized_start=675 + _globals['_CHUNKSTREAMREQUEST']._serialized_end=711 + _globals['_CHUNKPIPELINEEVENT']._serialized_start=714 + _globals['_CHUNKPIPELINEEVENT']._serialized_end=1012 + _globals['_WORKERSERVICE']._serialized_start=1015 + _globals['_WORKERSERVICE']._serialized_end=1390 # @@protoc_insertion_point(module_scope) diff --git a/core/rpc/worker_pb2_grpc.py b/core/rpc/worker_pb2_grpc.py index 402cab9..925ca59 100644 --- a/core/rpc/worker_pb2_grpc.py +++ b/core/rpc/worker_pb2_grpc.py @@ -5,7 +5,7 @@ import warnings from . import worker_pb2 as worker__pb2 -GRPC_GENERATED_VERSION = '1.76.0' +GRPC_GENERATED_VERSION = '1.78.0' GRPC_VERSION = grpc.__version__ _version_not_supported = False @@ -54,6 +54,11 @@ class WorkerServiceStub(object): request_serializer=worker__pb2.Empty.SerializeToString, response_deserializer=worker__pb2.WorkerStatus.FromString, _registered_method=True) + self.StreamChunkPipeline = channel.unary_stream( + '/mpr.worker.WorkerService/StreamChunkPipeline', + request_serializer=worker__pb2.ChunkStreamRequest.SerializeToString, + response_deserializer=worker__pb2.ChunkPipelineEvent.FromString, + _registered_method=True) class WorkerServiceServicer(object): @@ -83,6 +88,12 @@ class WorkerServiceServicer(object): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def StreamChunkPipeline(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_WorkerServiceServicer_to_server(servicer, server): rpc_method_handlers = { @@ -106,6 +117,11 @@ def add_WorkerServiceServicer_to_server(servicer, server): request_deserializer=worker__pb2.Empty.FromString, response_serializer=worker__pb2.WorkerStatus.SerializeToString, ), + 'StreamChunkPipeline': grpc.unary_stream_rpc_method_handler( + servicer.StreamChunkPipeline, + request_deserializer=worker__pb2.ChunkStreamRequest.FromString, + response_serializer=worker__pb2.ChunkPipelineEvent.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'mpr.worker.WorkerService', rpc_method_handlers) @@ -224,3 +240,30 @@ class WorkerService(object): timeout, metadata, _registered_method=True) + + @staticmethod + def StreamChunkPipeline(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/mpr.worker.WorkerService/StreamChunkPipeline', + worker__pb2.ChunkStreamRequest.SerializeToString, + worker__pb2.ChunkPipelineEvent.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/core/schema/modelgen.json b/core/schema/modelgen.json index 80a17a9..dcf1c6d 100644 --- a/core/schema/modelgen.json +++ b/core/schema/modelgen.json @@ -13,8 +13,8 @@ }, { "target": "typescript", - "output": "ui/timeline/src/types.ts", - "include": ["dataclasses", "enums", "api"] + "output": "ui/common/types/generated.ts", + "include": ["dataclasses", "enums", "api", "views"] }, { "target": "protobuf", diff --git a/core/schema/models/__init__.py b/core/schema/models/__init__.py index 188440f..d62b172 100644 --- a/core/schema/models/__init__.py +++ b/core/schema/models/__init__.py @@ -16,6 +16,8 @@ from .grpc import ( GRPC_SERVICE, CancelRequest, CancelResponse, + ChunkPipelineEvent, + ChunkStreamRequest, Empty, JobRequest, JobResponse, @@ -23,12 +25,13 @@ from .grpc import ( ProgressUpdate, WorkerStatus, ) -from .jobs import JobStatus, TranscodeJob +from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob from .media import AssetStatus, MediaAsset from .presets import BUILTIN_PRESETS, TranscodePreset +from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent # Core domain models - generates Django, Pydantic, TypeScript -DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob] +DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob, ChunkJob] # API request/response models - generates TypeScript only (no Django) # WorkerStatus from grpc.py is reused here @@ -42,7 +45,10 @@ API_MODELS = [ ] # Status enums - included in generated code -ENUMS = [AssetStatus, JobStatus] +ENUMS = [AssetStatus, JobStatus, ChunkJobStatus] + +# View/event models - generates TypeScript for UI consumption +VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile] # gRPC messages - generates Proto GRPC_MESSAGES = [ @@ -54,6 +60,8 @@ GRPC_MESSAGES = [ CancelResponse, WorkerStatus, Empty, + ChunkStreamRequest, + ChunkPipelineEvent, ] __all__ = [ @@ -61,6 +69,7 @@ __all__ = [ "MediaAsset", "TranscodePreset", "TranscodeJob", + "ChunkJob", # API Models "CreateJobRequest", "UpdateAssetRequest", @@ -70,6 +79,7 @@ __all__ = [ # Enums "AssetStatus", "JobStatus", + "ChunkJobStatus", # gRPC "GRPC_SERVICE", "JobRequest", @@ -80,10 +90,18 @@ __all__ = [ "CancelResponse", "WorkerStatus", "Empty", + "ChunkStreamRequest", + "ChunkPipelineEvent", + # Views + "ChunkEvent", + "WorkerEvent", + "PipelineStats", + "ChunkOutputFile", # For generator "DATACLASSES", "API_MODELS", "ENUMS", + "VIEWS", "GRPC_MESSAGES", "BUILTIN_PRESETS", ] diff --git a/core/schema/models/grpc.py b/core/schema/models/grpc.py index 313bf7b..841c0a3 100644 --- a/core/schema/models/grpc.py +++ b/core/schema/models/grpc.py @@ -41,6 +41,13 @@ class CancelRequest: job_id: str +@dataclass +class ChunkStreamRequest: + """Request to stream chunk pipeline events.""" + + job_id: str + + @dataclass class Empty: """Empty message for requests with no parameters.""" @@ -94,6 +101,26 @@ class WorkerStatus: gpu_available: bool +@dataclass +class ChunkPipelineEvent: + """Streaming chunk pipeline event.""" + + job_id: str + event_type: str # pipeline_start, chunk_queued, chunk_done, etc. + sequence: int = 0 + worker_id: str = "" + state: str = "" + queue_size: int = 0 + elapsed: float = 0.0 + throughput_mbps: float = 0.0 + total_chunks: int = 0 + processed_chunks: int = 0 + failed_chunks: int = 0 + error: str = "" + processing_time: float = 0.0 + retries: int = 0 + + # ----------------------------------------------------------------------------- # Service Definition (for documentation, generator uses this) # ----------------------------------------------------------------------------- @@ -126,5 +153,11 @@ GRPC_SERVICE = { "response": WorkerStatus, "stream_response": False, }, + { + "name": "StreamChunkPipeline", + "request": ChunkStreamRequest, + "response": ChunkPipelineEvent, + "stream_response": True, # Server streaming + }, ], } diff --git a/core/schema/models/jobs.py b/core/schema/models/jobs.py index 8a7f6f0..0957034 100644 --- a/core/schema/models/jobs.py +++ b/core/schema/models/jobs.py @@ -1,13 +1,14 @@ """ -TranscodeJob Schema Definition +Job Schema Definitions -Source of truth for job data model. +Source of truth for job data models. +TranscodeJob and ChunkJob share common lifecycle fields by convention. """ from dataclasses import dataclass, field from datetime import datetime from enum import Enum -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from uuid import UUID @@ -77,3 +78,56 @@ class TranscodeJob: return self.preset_id is None and ( self.trim_start is not None or self.trim_end is not None ) + + +class ChunkJobStatus(str, Enum): + """Status of a chunk pipeline job.""" + + PENDING = "pending" + CHUNKING = "chunking" + PROCESSING = "processing" + COLLECTING = "collecting" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class ChunkJob: + """ + A chunk pipeline job — splits a media file into chunks and processes them + through a concurrent worker pool. + """ + + id: UUID + + # Input + source_asset_id: UUID + + # Configuration + chunk_duration: float = 10.0 # seconds + num_workers: int = 4 + max_retries: int = 3 + processor_type: str = "ffmpeg" # "ffmpeg", "checksum", "simulated_decode", "composite" + + # Status & Progress + status: ChunkJobStatus = ChunkJobStatus.PENDING + progress: float = 0.0 # 0.0 to 100.0 + total_chunks: int = 0 + processed_chunks: int = 0 + failed_chunks: int = 0 + retry_count: int = 0 + error_message: Optional[str] = None + + # Result stats + throughput_mbps: Optional[float] = None + elapsed_seconds: Optional[float] = None + + # Worker tracking + celery_task_id: Optional[str] = None + priority: int = 0 # Lower = higher priority + + # Timestamps + created_at: Optional[datetime] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None diff --git a/core/schema/models/views.py b/core/schema/models/views.py new file mode 100644 index 0000000..5d9284e --- /dev/null +++ b/core/schema/models/views.py @@ -0,0 +1,57 @@ +""" +View/Event Schema Definitions + +Projections of domain models for UI consumption via SSE events. +These reference existing schema types (e.g., ChunkJobStatus) to maintain +type-level dependencies — if the domain model changes, views update too. +""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ChunkEvent: + """SSE event for a single chunk's lifecycle.""" + + sequence: int + status: str + size: Optional[int] = None + worker_id: Optional[str] = None + processing_time: Optional[float] = None + error: Optional[str] = None + retries: int = 0 + + +@dataclass +class WorkerEvent: + """SSE event for worker state changes.""" + + worker_id: str + state: str + current_chunk: Optional[int] = None + processed: int = 0 + errors: int = 0 + retries: int = 0 + + +@dataclass +class PipelineStats: + """Aggregate pipeline statistics, updated via SSE.""" + + total_chunks: int = 0 + processed: int = 0 + failed: int = 0 + retries: int = 0 + elapsed: float = 0.0 + throughput_mbps: float = 0.0 + queue_size: int = 0 + + +@dataclass +class ChunkOutputFile: + """A chunk output file in S3/MinIO with presigned download URL.""" + + key: str + size: int = 0 + url: str = "" diff --git a/core/task/__init__.py b/core/task/__init__.py deleted file mode 100644 index fea2bad..0000000 --- a/core/task/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -MPR Worker Module - -Provides executor abstraction and Celery tasks for job processing. -""" - -from .executor import Executor, LocalExecutor, get_executor -from .tasks import run_transcode_job - -__all__ = [ - "Executor", - "LocalExecutor", - "get_executor", - "run_transcode_job", -] diff --git a/core/task/tasks.py b/core/task/tasks.py deleted file mode 100644 index 5e0713a..0000000 --- a/core/task/tasks.py +++ /dev/null @@ -1,105 +0,0 @@ -""" -Celery tasks for job processing. -""" - -import logging -import os -from typing import Any, Dict, Optional - -from celery import shared_task - -from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file -from core.rpc.server import update_job_progress -from core.task.executor import get_executor - -logger = logging.getLogger(__name__) - - -@shared_task(bind=True, queue="transcode", max_retries=3, default_retry_delay=60) -def run_transcode_job( - self, - job_id: str, - source_key: str, - output_key: str, - preset: Optional[Dict[str, Any]] = None, - trim_start: Optional[float] = None, - trim_end: Optional[float] = None, - duration: Optional[float] = None, -) -> Dict[str, Any]: - """ - Celery task to run a transcode/trim job. - - Downloads source from S3, runs FFmpeg, uploads result to S3. - """ - logger.info(f"Starting job {job_id}: {source_key} -> {output_key}") - - update_job_progress(job_id, progress=0, status="processing") - - # Download source from S3 to temp file - logger.info(f"Downloading {source_key} from {BUCKET_IN}") - tmp_source = download_to_temp(BUCKET_IN, source_key) - - # Create temp output path with same extension - import tempfile - from pathlib import Path - - ext = Path(output_key).suffix or ".mp4" - fd, tmp_output = tempfile.mkstemp(suffix=ext) - os.close(fd) - - def progress_callback(percent: int, details: Dict[str, Any]) -> None: - update_job_progress( - job_id, - progress=percent, - current_time=details.get("time", 0.0), - status="processing", - ) - - try: - executor = get_executor() - success = executor.run( - job_id=job_id, - source_path=tmp_source, - output_path=tmp_output, - preset=preset, - trim_start=trim_start, - trim_end=trim_end, - duration=duration, - progress_callback=progress_callback, - ) - - if success: - # Upload result to S3 - logger.info(f"Uploading {output_key} to {BUCKET_OUT}") - upload_file(tmp_output, BUCKET_OUT, output_key) - - logger.info(f"Job {job_id} completed successfully") - update_job_progress(job_id, progress=100, status="completed") - return { - "status": "completed", - "job_id": job_id, - "output_key": output_key, - } - else: - raise RuntimeError("Executor returned False") - - except Exception as e: - logger.exception(f"Job {job_id} failed: {e}") - update_job_progress(job_id, progress=0, status="failed", error=str(e)) - - if self.request.retries < self.max_retries: - raise self.retry(exc=e) - - return { - "status": "failed", - "job_id": job_id, - "error": str(e), - } - - finally: - # Clean up temp files - for f in [tmp_source, tmp_output]: - try: - os.unlink(f) - except OSError: - pass diff --git a/ctrl/Dockerfile b/ctrl/Dockerfile index 389484b..247b054 100644 --- a/ctrl/Dockerfile +++ b/ctrl/Dockerfile @@ -6,5 +6,6 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # No COPY . . — code is volume-mounted in dev (..:/app) +# This image only provides the Python runtime + dependencies CMD ["python", "admin/manage.py", "runserver", "0.0.0.0:8000"] diff --git a/ctrl/Dockerfile.worker b/ctrl/Dockerfile.worker index 9260014..e69ff5b 100644 --- a/ctrl/Dockerfile.worker +++ b/ctrl/Dockerfile.worker @@ -10,5 +10,6 @@ COPY requirements.txt requirements-worker.txt ./ RUN pip install --no-cache-dir -r requirements-worker.txt # No COPY . . — code is volume-mounted in dev (..:/app) +# This image only provides Python runtime + FFmpeg + dependencies CMD ["celery", "-A", "admin.mpr", "worker", "--loglevel=info"] diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 2c114c7..47c9ae2 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -17,6 +17,20 @@ x-healthcheck-defaults: &healthcheck-defaults timeout: 5s retries: 5 +x-python-service: &python-service + build: + context: .. + dockerfile: ctrl/Dockerfile + volumes: + - ..:/app + environment: + <<: *common-env + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + services: # ============================================================================= # Infrastructure @@ -75,64 +89,55 @@ services: mc anonymous set download local/mpr-media-in mc anonymous set download local/mpr-media-out + envoy: + image: envoyproxy/envoy:v1.28-latest + ports: + - "8090:8090" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml:ro + depends_on: + - grpc + nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro + - ./landing.html:/etc/nginx/landing.html:ro + - ../media/out:/app/media/out:ro depends_on: - django - fastapi - timeline + - chunker - minio + - envoy # ============================================================================= # Application Services # ============================================================================= django: - build: - context: .. - dockerfile: ctrl/Dockerfile + <<: *python-service command: > bash -c "python admin/manage.py migrate && python admin/manage.py loadbuiltins || true && python admin/manage.py runserver 0.0.0.0:8701" ports: - "8701:8701" - environment: - <<: *common-env - volumes: - - ..:/app - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy fastapi: - build: - context: .. - dockerfile: ctrl/Dockerfile + <<: *python-service command: uvicorn core.api.main:app --host 0.0.0.0 --port 8702 --reload ports: - "8702:8702" environment: <<: *common-env DJANGO_ALLOW_ASYNC_UNSAFE: "true" - volumes: - - ..:/app - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy grpc: - build: - context: .. - dockerfile: ctrl/Dockerfile + <<: *python-service command: python -m core.rpc.server ports: - "50052:50051" @@ -140,19 +145,12 @@ services: <<: *common-env GRPC_PORT: 50051 GRPC_MAX_WORKERS: 10 - volumes: - - ..:/app - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy celery: build: context: .. dockerfile: ctrl/Dockerfile.worker - command: celery -A admin.mpr worker -l info -Q transcode -c 2 + command: celery -A admin.mpr worker -l info -Q celery,transcode -c 2 environment: <<: *common-env MPR_EXECUTOR: local @@ -176,6 +174,21 @@ services: VITE_ALLOWED_HOSTS: ${VITE_ALLOWED_HOSTS:-} volumes: - ../ui/timeline/src:/app/src + - ../ui/timeline/vite.config.ts:/app/vite.config.ts + - ../ui/common:/common + + chunker: + build: + context: ../ui/chunker + dockerfile: Dockerfile + ports: + - "5174:5174" + environment: + VITE_ALLOWED_HOSTS: ${VITE_ALLOWED_HOSTS:-} + volumes: + - ../ui/chunker/src:/app/src + - ../ui/chunker/vite.config.ts:/app/vite.config.ts + - ../ui/common:/common volumes: postgres-data: diff --git a/ctrl/envoy.yaml b/ctrl/envoy.yaml new file mode 100644 index 0000000..61362e0 --- /dev/null +++ b/ctrl/envoy.yaml @@ -0,0 +1,64 @@ +admin: + address: + socket_address: { address: 0.0.0.0, port_value: 9901 } + +static_resources: + listeners: + - name: listener_0 + address: + socket_address: { address: 0.0.0.0, port_value: 8090 } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + codec_type: auto + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: ["*"] + routes: + - match: { prefix: "/" } + route: + cluster: grpc_service + timeout: 600s + max_stream_duration: + grpc_timeout_header_max: 600s + cors: + allow_origin_string_match: + - prefix: "*" + allow_methods: GET, PUT, DELETE, POST, OPTIONS + allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout + expose_headers: grpc-status,grpc-message + max_age: "1728000" + http_filters: + - name: envoy.filters.http.grpc_web + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb + - name: envoy.filters.http.cors + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: grpc_service + connect_timeout: 5s + type: logical_dns + lb_policy: round_robin + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + load_assignment: + cluster_name: grpc_service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: grpc + port_value: 50051 diff --git a/ctrl/generate.sh b/ctrl/generate.sh index f3f9337..e3dbe5f 100755 --- a/ctrl/generate.sh +++ b/ctrl/generate.sh @@ -19,4 +19,13 @@ python -m grpc_tools.protoc \ # Fix relative import in generated grpc stub sed -i 's/^import worker_pb2/from . import worker_pb2/' core/rpc/worker_pb2_grpc.py +# Generate TypeScript gRPC-Web client from proto +echo "Generating TypeScript gRPC-Web client..." +cd ui/chunker +npx protoc \ + --ts_out ../common/api/grpc \ + --proto_path ../../core/rpc/protos \ + worker.proto +cd ../.. + echo "Done!" diff --git a/ctrl/lambda/Dockerfile b/ctrl/lambda/Dockerfile index 52c0b82..2869686 100644 --- a/ctrl/lambda/Dockerfile +++ b/ctrl/lambda/Dockerfile @@ -14,8 +14,8 @@ COPY ctrl/lambda/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application code -COPY core/task/lambda_handler.py ${LAMBDA_TASK_ROOT}/core/task/lambda_handler.py -COPY core/task/__init__.py ${LAMBDA_TASK_ROOT}/core/task/__init__.py +COPY core/jobs/lambda_handler.py ${LAMBDA_TASK_ROOT}/core/jobs/lambda_handler.py +COPY core/jobs/__init__.py ${LAMBDA_TASK_ROOT}/core/jobs/__init__.py COPY core/ ${LAMBDA_TASK_ROOT}/core/ -CMD ["core.task.lambda_handler.handler"] +CMD ["core.jobs.lambda_handler.handler"] diff --git a/ctrl/landing.html b/ctrl/landing.html new file mode 100644 index 0000000..7ecfafc --- /dev/null +++ b/ctrl/landing.html @@ -0,0 +1,107 @@ + + + + + + MPR + + + +
+

MPR

+

Media Processing & Review

+ + +
+ + diff --git a/ctrl/nginx.conf b/ctrl/nginx.conf index 526539e..1cdb4ce 100644 --- a/ctrl/nginx.conf +++ b/ctrl/nginx.conf @@ -21,14 +21,28 @@ http { server timeline:5173; } + upstream chunker { + server chunker:5174; + } + upstream minio { server minio:9000; } + upstream envoy { + server envoy:8090; + } + server { listen 80; server_name mpr.local.ar; + # Landing page + location = / { + root /etc/nginx; + try_files /landing.html =404; + } + # Django Admin location /admin { proxy_pass http://django; @@ -54,7 +68,7 @@ http { } # Timeline UI - location /ui { + location /timeline/ { proxy_pass http://timeline; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; @@ -62,8 +76,17 @@ http { proxy_set_header Connection "upgrade"; } - # Vite HMR websocket - location /@vite { + # Chunker UI + location /chunker/ { + proxy_pass http://chunker; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + } + + # Vite HMR websocket (timeline) + location /timeline/@vite { proxy_pass http://timeline; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; @@ -71,6 +94,15 @@ http { proxy_set_header Host $host; } + # Vite HMR websocket (chunker) + location /chunker/@vite { + proxy_pass http://chunker; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $host; + } + # Media files - proxied from MinIO (local) or S3 (AWS) location /media/in/ { proxy_pass http://minio/mpr-media-in/; @@ -78,16 +110,24 @@ http { } location /media/out/ { - proxy_pass http://minio/mpr-media-out/; - proxy_set_header Host $http_host; + alias /app/media/out/; + autoindex on; } - # Default to Timeline UI - location / { - proxy_pass http://timeline; + # gRPC-Web proxy via Envoy + location /grpc-web/ { + proxy_pass http://envoy/; + proxy_http_version 1.1; proxy_set_header Host $host; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 600s; + + # Critical for streaming: disable nginx response buffering + proxy_buffering off; + proxy_cache off; + chunked_transfer_encoding on; } } } diff --git a/docs/architecture/05-chunker-pipeline.md b/docs/architecture/05-chunker-pipeline.md new file mode 100644 index 0000000..04e9e46 --- /dev/null +++ b/docs/architecture/05-chunker-pipeline.md @@ -0,0 +1,290 @@ +# Chunker Pipeline — Execution Path + +## Overview + +The chunker pipeline splits a media file into time-based segments using FFmpeg stream-copy. Events flow from worker threads through Redis and gRPC-Web streaming to the browser UI in real time. + +**7 hops from worker thread to pixel:** + +``` +Worker thread → Pipeline._emit() → event_bridge() → Redis RPUSH + → [50ms poll] gRPC server LRANGE → yield protobuf + → HTTP/2 frame → Envoy (grpc-web filter) + → HTTP/1.1 chunk → nginx (proxy_buffering off) + → fetch ReadableStream → protobuf-ts decode + → setEvents([...prev, evt]) → React re-render +``` + +--- + +## Step 1: Job Creation (Browser → GraphQL → Celery) + +``` +User clicks "Start" + → App.tsx: handleStart(config) + → api.ts: createChunkJob(config) + → POST /graphql (nginx :80 → fastapi:8702) + → graphql.py: Mutation.create_chunk_job() + → core.db: creates ChunkJob row in Postgres + → Celery: run_job.delay(job_type="chunk", job_id=..., payload=...) + → Returns { id, celery_task_id } to browser + → App.tsx: setJobId(id) — triggers gRPC stream subscription +``` + +**Files:** `ui/chunker/src/api.ts`, `core/api/graphql.py`, `core/jobs/task.py` + +--- + +## Step 2: gRPC-Web Stream (Browser → nginx → Envoy → gRPC Server) + +Once `jobId` is set, `useGrpcStream(jobId)` opens a server-streaming RPC: + +``` +useGrpcStream(jobId) fires useEffect + → GrpcWebFetchTransport({ baseUrl: "/grpc-web" }) + → WorkerServiceClient.streamChunkPipeline({ jobId }) + → fetch() POST to /grpc-web/worker.WorkerService/StreamChunkPipeline + → nginx :80 /grpc-web/ (proxy_pass → envoy:8090, proxy_buffering off) + → Envoy :8090 (grpc_web filter: HTTP/1.1 grpc-web → HTTP/2 native gRPC) + → gRPC server :50051 WorkerServicer.StreamChunkPipeline() + → Enters Redis polling loop (Step 5) +``` + +**Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ctrl/nginx.conf`, `ctrl/envoy.yaml`, `core/rpc/server.py` + +**Key nginx config:** `proxy_buffering off` is critical — without it, nginx collects the entire upstream response before forwarding, defeating streaming entirely. + +--- + +## Step 3: Celery Worker → ChunkHandler + +``` +Celery picks up run_job task + → task.py: run_job(job_type="chunk", job_id, payload) + → registry.get_handler("chunk") → ChunkHandler + → chunk.py: ChunkHandler.process(job_id, payload) + → download_to_temp(BUCKET_IN, source_key) — pulls source from MinIO/S3 + → Creates output_dir: /app/media/out/chunks/{job_id}/ + → Constructs event_bridge callback (bridges Pipeline events → Redis) + → pipeline = Pipeline(source, ..., event_callback=event_bridge, output_dir=...) + → pipeline.run() +``` + +**Files:** `core/jobs/task.py`, `core/jobs/handlers/chunk.py` + +The `event_bridge` closure wraps every `Pipeline._emit()` call, forwarding to `push_event(job_id, event_type, data)` which writes to Redis. + +--- + +## Step 4: Pipeline Orchestration (inside Celery worker process) + +`Pipeline.run()` spawns multiple threads: + +``` +pipeline.run(): + │ + ├─ Chunker(source, chunk_duration) + │ → ffprobe source file → gets duration, file_size + │ → calculates total_chunks = ceil(duration / chunk_duration) + │ + ├─ _emit("pipeline_start", {...}) → event_bridge → Redis + ├─ _emit("pipeline_info", {file_size, duration, total_chunks}) → Redis + │ + ├─ Creates ChunkQueue(maxsize=10) + ├─ Creates WorkerPool(num_workers=N, chunk_queue, processor, event_callback) + │ + ├─ pool.start() — spawns N worker threads + │ + ├─ MONITOR THREAD starts (_monitor_progress) + │ → Every 500ms: _emit("pipeline_progress", {elapsed, throughput_mbps}) → Redis + │ + ├─ PRODUCER THREAD starts (_produce_chunks) + │ → Iterates chunker.chunks() → yields Chunk(sequence, start_time, end_time) + │ → For each: chunk_queue.put(chunk) + │ → _emit("chunk_queued", {sequence, start_time, end_time, queue_size}) → Redis + │ → chunk_queue.close() when done (sends N sentinel Nones) + │ + ├─ WORKER THREADS (N concurrent, each runs worker.py:Worker.run()) + │ │ Each worker loops: + │ │ + │ ├─ chunk = chunk_queue.get(timeout=1.0) + │ ├─ _emit("chunk_processing", {sequence, state:"processing", queue_size}) → Redis + │ │ + │ ├─ processor.process(chunk) + │ │ ├─ ffmpeg: runs `ffmpeg -ss start -to end -c copy chunk_NNNN.mp4` + │ │ ├─ simulated_decode: sleep(random) + checksum + │ │ └─ checksum: reads bytes, computes hash + │ │ + │ ├─ On success: _emit("chunk_done", {sequence, processing_time, retries, queue_size}) → Redis + │ ├─ On failure: retries with exponential backoff (0.1s, 0.2s, 0.4s...) + │ │ └─ _emit("chunk_retry", {sequence, attempt, backoff}) → Redis + │ │ └─ _emit("chunk_error", {sequence, error, retries}) → Redis (after exhaustion) + │ │ + │ └─ On sentinel (None): _emit("worker_status", {state:"stopped"}) → Redis + │ + ├─ pool.wait() — joins all worker threads, collects results + ├─ monitor_stop.set() — stops progress monitor + │ + ├─ ResultCollector — reassembles results in sequence order + │ └─ _emit("chunk_collected", {sequence, buffered, emitted}) → Redis + │ + ├─ Writes manifest.json to output_dir + │ + └─ _emit("pipeline_complete", {total_chunks, processed, failed, elapsed, throughput}) → Redis +``` + +**Files:** `core/chunker/pipeline.py`, `core/chunker/worker.py`, `core/chunker/pool.py`, `core/chunker/chunker.py`, `core/chunker/collector.py` + +--- + +## Step 5: Redis — the Event Bus + +``` +WRITE side (Celery worker, all threads): + push_event(job_id, event_type, data) + → json.dumps({"event": event_type, ...data}) + → Redis RPUSH to key "chunk_events:{job_id}" + → Redis EXPIRE 3600 (1 hour TTL) + +READ side (gRPC server, StreamChunkPipeline): + poll_events(job_id, cursor) + → Redis LRANGE "chunk_events:{job_id}" cursor -1 + → Returns (parsed_events, new_cursor) + → Called every 50ms (time.sleep(0.05) in server loop) +``` + +Redis acts as a decoupling layer between the Celery worker process (which runs the pipeline) and the gRPC server process (which streams to browsers). Events are appended with RPUSH and read with cursor-based LRANGE polling. + +**Files:** `core/events.py` + +--- + +## Step 6: gRPC Server → Envoy → nginx → Browser + +``` +server.py: StreamChunkPipeline polling loop: + while context.is_active(): + events, cursor = poll_events(job_id, cursor) ← Redis LRANGE + for data in events: + yield worker_pb2.ChunkPipelineEvent( ← serialized protobuf message + job_id, event_type, sequence, worker_id, + state, queue_size, elapsed, throughput_mbps, + total_chunks, processed_chunks, failed_chunks, + error, processing_time, retries + ) + if event_type in ("pipeline_complete", "pipeline_error"): + return ← ends the stream + time.sleep(0.05) ← 50ms poll interval + + Each yield sends: + → gRPC HTTP/2 DATA frame to Envoy + → Envoy grpc_web filter: HTTP/2 → base64-encoded grpc-web-text + → nginx proxy_pass (proxy_buffering off) → chunked HTTP/1.1 to browser + → fetch() ReadableStream in GrpcWebFetchTransport + → @protobuf-ts decodes protobuf → ChunkPipelineEvent TypeScript object +``` + +**Files:** `core/rpc/server.py`, `ctrl/envoy.yaml`, `ctrl/nginx.conf`, `ui/common/api/grpc/worker.ts`, `ui/common/api/grpc/worker.client.ts` + +--- + +## Step 7: React State Derivation and Rendering + +``` +useGrpcStream.ts: + for await (const msg of stream.responses): + const evt = toEvent(msg) ← maps protobuf camelCase → snake_case PipelineEvent + setEvents(prev => [...prev, evt]) ← appends to events array + if pipeline_complete/error → setDone(true), break + +App.tsx useMemo(events): + Iterates ALL events on every update, derives: + ├─ chunkMap: Map — state machine per chunk + │ pending → queued → processing → done/error/retry + ├─ workerMap: Map — state per worker + │ idle → processing → idle → ... → stopped + ├─ stats: PipelineStats + │ total_chunks, processed, failed, retries, elapsed, throughput_mbps, queue_size + ├─ errors: ErrorEntry[] — every event containing an error field + └─ queueSize: number — last seen queue_size value + + Renders: + ├─ ChunkGrid — colored cells per chunk (pending/queued/processing/done/error) + ├─ QueueGauge — current queue depth / max + ├─ WorkerPanel — per-worker state + current chunk assignment + ├─ StatsPanel — elapsed time, throughput, processed/failed counts + ├─ ErrorLog — scrollable error list + └─ OutputFiles — download links (when done) +``` + +**Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ui/chunker/src/App.tsx` + +--- + +## Step 8: Output File Access (after pipeline completes) + +``` +App.tsx useEffect([done, jobId]): + → api.ts: getChunkOutputFiles(jobId) + → POST /graphql → graphql.py: chunk_output_files(job_id) + → Reads /app/media/out/chunks/{job_id}/ directory listing from disk + → Returns [{key, size, url: "/media/out/chunks/{job_id}/chunk_0001.mp4"}] + → Browser renders download links + → Click link → nginx /media/out/ → alias /app/media/out/ → serves file from disk +``` + +Chunks are written directly to `media/out/chunks/{job_id}/` by the ffmpeg processor — no MinIO upload needed for output. Nginx serves them with `autoindex on`. + +**Files:** `core/api/graphql.py`, `core/jobs/handlers/chunk.py`, `ctrl/nginx.conf` + +--- + +## Event Types Reference + +| Event | Source | Key Fields | +|-------|--------|------------| +| `pipeline_start` | Pipeline.run() | source, chunk_duration, num_workers, processor_type | +| `pipeline_info` | Pipeline.run() | file_size, source_duration, total_chunks | +| `pipeline_progress` | Monitor thread (500ms) | elapsed, throughput_mbps | +| `chunk_queued` | Producer thread | sequence, start_time, end_time, duration, queue_size | +| `chunk_processing` | Worker thread | sequence, worker_id, state, queue_size | +| `chunk_done` | Worker thread | sequence, processing_time, retries, queue_size | +| `chunk_retry` | Worker thread | sequence, attempt, backoff | +| `chunk_error` | Worker thread | sequence, error, retries | +| `chunk_collected` | ResultCollector | sequence, buffered, emitted | +| `worker_status` | Worker thread | worker_id, state (idle/processing/stopped) | +| `pipeline_complete` | Pipeline.run() | total_chunks, processed, failed, elapsed, throughput_mbps | +| `pipeline_error` | Pipeline.run() | error | + +--- + +## Thread Model (inside Celery worker) + +``` +Celery worker process + └─ run_job task thread + └─ Pipeline.run() + ├─ Producer thread — enqueues chunks + ├─ Monitor thread — emits progress every 500ms + ├─ Worker thread 0 — pulls from queue, processes + ├─ Worker thread 1 — pulls from queue, processes + ├─ Worker thread 2 — pulls from queue, processes + └─ Worker thread 3 — pulls from queue, processes +``` + +All threads share the same `event_callback` → `event_bridge` → `push_event()`, which creates a new Redis connection per call. Thread-safe via Redis atomic RPUSH. + +--- + +## Infrastructure + +| Service | Port | Role | +|---------|------|------| +| nginx | 80 | Reverse proxy, static file serving | +| fastapi | 8702 | GraphQL API (Strawberry) | +| celery | — | Task worker (runs pipeline) | +| redis | 6379 | Event bus + Celery broker | +| grpc | 50051 | gRPC server (StreamChunkPipeline) | +| envoy | 8090 | gRPC-Web ↔ native gRPC translation | +| minio | 9000 | S3-compatible source media storage | +| postgres | 5432 | Job/asset metadata | diff --git a/docs/architecture/index.html b/docs/architecture/index.html deleted file mode 100644 index e6651e7..0000000 --- a/docs/architecture/index.html +++ /dev/null @@ -1,212 +0,0 @@ - - - - - - MPR - Architecture - - - -

MPR - Media Processor

-

- Media transcoding platform with dual execution modes: local (Celery - + MinIO) and cloud (AWS Step Functions + Lambda + S3). -

- - - -

System Overview

-
-
-

Local Architecture (Development)

- - Local Architecture - - Open full size -
-
-

AWS Architecture (Production)

- - AWS Architecture - - Open full size -
-
- -
-

Components

-
    -
  • - - Reverse Proxy (nginx) -
  • -
  • - - Application Layer (Django Admin, GraphQL API, Timeline UI) -
  • -
  • - - Worker Layer (Celery local mode) -
  • -
  • - - AWS (Step Functions, Lambda - cloud mode) -
  • -
  • - - Data Layer (PostgreSQL, Redis) -
  • -
  • - - S3 Storage (MinIO local / AWS S3 cloud) -
  • -
-
- -

Data Model

-
-
-

Entity Relationships

- - Data Model - - Open full size -
-
- -
-

Entities

-
    -
  • - - MediaAsset - Video/audio files (S3 keys as paths) -
  • -
  • - - TranscodePreset - Encoding configurations -
  • -
  • - - TranscodeJob - Processing queue (celery_task_id or - execution_arn) -
  • -
-
- -

Job Flow

-
-
-

Job Lifecycle

- - Job Flow - - Open full size -
-
- -
-

Job States

-
    -
  • - - PENDING - Waiting in queue -
  • -
  • - - PROCESSING - Worker executing -
  • -
  • - - COMPLETED - Success -
  • -
  • - - FAILED - Error occurred -
  • -
  • - - CANCELLED - User cancelled -
  • -
-

Execution Modes

-
    -
  • - - Local: Celery + MinIO (S3 API) + FFmpeg -
  • -
  • - - Lambda: Step Functions + Lambda + AWS S3 -
  • -
-
- -

Media Storage

-
-

- MPR separates media into input and output paths for flexible - storage configuration. -

-

- View Media Storage Documentation → -

-
- -

API (GraphQL)

-
# GraphiQL IDE
-http://mpr.local.ar/graphql
-
-# Queries
-query { assets(status: "ready") { id filename duration } }
-query { jobs(status: "processing") { id status progress } }
-query { presets { id name container videoCodec } }
-query { systemStatus { status version } }
-
-# Mutations
-mutation { scanMediaFolder { found registered skipped } }
-mutation { createJob(input: { sourceAssetId: "...", presetId: "..." }) { id status } }
-mutation { cancelJob(id: "...") { id status } }
-mutation { retryJob(id: "...") { id status } }
-mutation { updateAsset(id: "...", input: { comments: "..." }) { id comments } }
-mutation { deleteAsset(id: "...") { ok } }
-
-# Lambda callback (REST)
-POST /api/jobs/{id}/callback      - Lambda completion webhook
- -

Access Points

-
# Local development
-127.0.0.1 mpr.local.ar
-http://mpr.local.ar/admin         - Django Admin
-http://mpr.local.ar/graphql       - GraphiQL
-http://mpr.local.ar/              - Timeline UI
-http://localhost:9001              - MinIO Console
-
-# AWS deployment
-https://mpr.mcrn.ar/              - Production
- -

Quick Reference

-
# Render SVGs from DOT files
-for f in *.dot; do dot -Tsvg "$f" -o "${f%.dot}.svg"; done
-
-# Switch executor mode
-MPR_EXECUTOR=local    # Celery + MinIO
-MPR_EXECUTOR=lambda   # Step Functions + Lambda + S3
- - diff --git a/docs/architecture/styles.css b/docs/architecture/styles.css index ef23579..b3094f2 100644 --- a/docs/architecture/styles.css +++ b/docs/architecture/styles.css @@ -3,6 +3,8 @@ --text-color: #e8e8e8; --accent-color: #4a90d9; --border-color: #333; + --sidebar-width: 220px; + --sidebar-bg: #151528; } * { @@ -16,6 +18,59 @@ body { background-color: var(--bg-color); color: var(--text-color); line-height: 1.6; +} + +/* Sidebar navigation */ +.sidebar { + position: fixed; + top: 0; + left: 0; + width: var(--sidebar-width); + height: 100vh; + background: var(--sidebar-bg); + border-right: 1px solid var(--border-color); + padding: 1.5rem 1rem; + overflow-y: auto; + z-index: 10; +} + +.sidebar h2 { + font-size: 1.2rem; + color: var(--accent-color); + margin-bottom: 1.5rem; + padding-bottom: 0.5rem; + border-bottom: 1px solid var(--border-color); +} + +.sidebar ul { + list-style: none; + display: flex; + flex-direction: column; + gap: 0.25rem; +} + +.sidebar li { + display: block; +} + +.sidebar a { + display: block; + padding: 0.4rem 0.6rem; + color: var(--text-color); + text-decoration: none; + font-size: 0.85rem; + border-radius: 4px; + transition: background 0.15s, color 0.15s; +} + +.sidebar a:hover { + background: rgba(74, 144, 217, 0.15); + color: var(--accent-color); +} + +/* Main content */ +.content { + margin-left: var(--sidebar-width); padding: 2rem; } @@ -25,12 +80,13 @@ h1 { color: var(--accent-color); } -h2 { +.content > h2 { font-size: 1.5rem; margin: 2rem 0 1rem; color: var(--text-color); border-bottom: 1px solid var(--border-color); padding-bottom: 0.5rem; + scroll-margin-top: 1rem; } .diagram-container { @@ -76,20 +132,6 @@ h2 { text-decoration: underline; } -nav { - margin-bottom: 2rem; -} - -nav a { - color: var(--accent-color); - text-decoration: none; - margin-right: 1.5rem; -} - -nav a:hover { - text-decoration: underline; -} - .legend { margin-top: 2rem; padding: 1rem; @@ -141,3 +183,27 @@ pre code { background: none; padding: 0; } + +/* Responsive: collapse sidebar on small screens */ +@media (max-width: 768px) { + .sidebar { + position: static; + width: 100%; + height: auto; + border-right: none; + border-bottom: 1px solid var(--border-color); + } + + .sidebar ul { + flex-direction: row; + flex-wrap: wrap; + } + + .content { + margin-left: 0; + } + + .diagram { + min-width: 100%; + } +} diff --git a/docs/index.html b/docs/index.html index 7b5f774..413af1a 100644 --- a/docs/index.html +++ b/docs/index.html @@ -7,219 +7,241 @@ -

MPR - Media Processor

-

- Media transcoding platform with three execution modes: local (Celery - + MinIO), AWS (Step Functions + Lambda + S3), and GCP (Cloud Run - Jobs + GCS). Storage is S3-compatible across all environments. -

- -