chunker and ui
This commit is contained in:
78
core/api/chunker_sse.py
Normal file
78
core/api/chunker_sse.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""
|
||||
SSE endpoint for chunker pipeline events.
|
||||
|
||||
Bridges gRPC StreamProgress to browser-native EventSource.
|
||||
GET /api/chunker/stream/{job_id} → text/event-stream
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from fastapi import APIRouter
|
||||
from starlette.responses import StreamingResponse
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/chunker", tags=["chunker"])
|
||||
|
||||
|
||||
async def _event_generator(job_id: str) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Generate SSE events by polling gRPC job state.
|
||||
|
||||
Yields server-sent events in the format:
|
||||
event: <event_type>
|
||||
data: <json_payload>
|
||||
"""
|
||||
from core.rpc.server import _active_jobs
|
||||
|
||||
last_state = None
|
||||
timeout = time.monotonic() + 600 # 10 min max
|
||||
|
||||
while time.monotonic() < timeout:
|
||||
job_state = _active_jobs.get(job_id)
|
||||
|
||||
if job_state is None:
|
||||
# Job not found yet — may not have started
|
||||
yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
await asyncio.sleep(0.5)
|
||||
continue
|
||||
|
||||
# Only send if state changed
|
||||
if job_state != last_state:
|
||||
last_state = dict(job_state)
|
||||
event_type = job_state.get("status", "update")
|
||||
|
||||
yield f"event: {event_type}\ndata: {json.dumps({**job_state, 'job_id': job_id})}\n\n"
|
||||
|
||||
# End stream when job is terminal
|
||||
if event_type in ("completed", "failed", "cancelled"):
|
||||
yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
break
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
yield f"event: timeout\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
|
||||
|
||||
@router.get("/stream/{job_id}")
|
||||
async def stream_chunk_job(job_id: str):
|
||||
"""
|
||||
SSE stream for a chunk pipeline job.
|
||||
|
||||
The UI connects via native EventSource:
|
||||
const es = new EventSource('/api/chunker/stream/<job_id>');
|
||||
es.addEventListener('processing', (e) => { ... });
|
||||
"""
|
||||
return StreamingResponse(
|
||||
_event_generator(job_id),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
@@ -15,6 +15,8 @@ from strawberry.schema.config import StrawberryConfig
|
||||
from strawberry.types import Info
|
||||
|
||||
from core.api.schema.graphql import (
|
||||
ChunkJobType,
|
||||
CreateChunkJobInput,
|
||||
CreateJobInput,
|
||||
DeleteResultType,
|
||||
MediaAssetType,
|
||||
@@ -172,30 +174,31 @@ class Mutation:
|
||||
priority=input.priority or 0,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"source_key": source.file_path,
|
||||
"output_key": output_filename,
|
||||
"preset": preset_snapshot or None,
|
||||
"trim_start": input.trim_start,
|
||||
"trim_end": input.trim_end,
|
||||
"duration": source.duration,
|
||||
}
|
||||
|
||||
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
|
||||
if executor_mode in ("lambda", "gcp"):
|
||||
from core.task.executor import get_executor
|
||||
from core.jobs.executor import get_executor
|
||||
|
||||
get_executor().run(
|
||||
job_type="transcode",
|
||||
job_id=str(job.id),
|
||||
source_path=source.file_path,
|
||||
output_path=output_filename,
|
||||
preset=preset_snapshot or None,
|
||||
trim_start=input.trim_start,
|
||||
trim_end=input.trim_end,
|
||||
duration=source.duration,
|
||||
payload=payload,
|
||||
)
|
||||
else:
|
||||
from core.task.tasks import run_transcode_job
|
||||
from core.jobs.task import run_job
|
||||
|
||||
result = run_transcode_job.delay(
|
||||
result = run_job.delay(
|
||||
job_type="transcode",
|
||||
job_id=str(job.id),
|
||||
source_key=source.file_path,
|
||||
output_key=output_filename,
|
||||
preset=preset_snapshot or None,
|
||||
trim_start=input.trim_start,
|
||||
trim_end=input.trim_end,
|
||||
duration=source.duration,
|
||||
payload=payload,
|
||||
)
|
||||
job.celery_task_id = result.id
|
||||
job.save(update_fields=["celery_task_id"])
|
||||
@@ -261,6 +264,62 @@ class Mutation:
|
||||
except Exception:
|
||||
raise Exception("Asset not found")
|
||||
|
||||
@strawberry.mutation
|
||||
def create_chunk_job(self, info: Info, input: CreateChunkJobInput) -> ChunkJobType:
|
||||
"""Create and dispatch a chunk pipeline job."""
|
||||
import uuid
|
||||
|
||||
from core.db import get_asset
|
||||
|
||||
try:
|
||||
source = get_asset(input.source_asset_id)
|
||||
except Exception:
|
||||
raise Exception("Source asset not found")
|
||||
|
||||
job_id = str(uuid.uuid4())
|
||||
|
||||
payload = {
|
||||
"source_key": source.file_path,
|
||||
"chunk_duration": input.chunk_duration,
|
||||
"num_workers": input.num_workers,
|
||||
"max_retries": input.max_retries,
|
||||
"processor_type": input.processor_type,
|
||||
}
|
||||
|
||||
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
|
||||
celery_task_id = None
|
||||
|
||||
if executor_mode in ("lambda", "gcp"):
|
||||
from core.jobs.executor import get_executor
|
||||
|
||||
get_executor().run(
|
||||
job_type="chunk",
|
||||
job_id=job_id,
|
||||
payload=payload,
|
||||
)
|
||||
else:
|
||||
from core.jobs.task import run_job
|
||||
|
||||
result = run_job.delay(
|
||||
job_type="chunk",
|
||||
job_id=job_id,
|
||||
payload=payload,
|
||||
)
|
||||
celery_task_id = result.id
|
||||
|
||||
return ChunkJobType(
|
||||
id=uuid.UUID(job_id),
|
||||
source_asset_id=input.source_asset_id,
|
||||
chunk_duration=input.chunk_duration,
|
||||
num_workers=input.num_workers,
|
||||
max_retries=input.max_retries,
|
||||
processor_type=input.processor_type,
|
||||
status="pending",
|
||||
progress=0.0,
|
||||
priority=input.priority,
|
||||
celery_task_id=celery_task_id,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schema
|
||||
|
||||
@@ -23,6 +23,7 @@ from fastapi import FastAPI, Header, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from strawberry.fastapi import GraphQLRouter
|
||||
|
||||
from core.api.chunker_sse import router as chunker_router
|
||||
from core.api.graphql import schema as graphql_schema
|
||||
|
||||
CALLBACK_API_KEY = os.environ.get("CALLBACK_API_KEY", "")
|
||||
@@ -48,6 +49,9 @@ app.add_middleware(
|
||||
graphql_router = GraphQLRouter(schema=graphql_schema, graphql_ide="graphiql")
|
||||
app.include_router(graphql_router, prefix="/graphql")
|
||||
|
||||
# Chunker SSE
|
||||
app.include_router(chunker_router)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
def root():
|
||||
|
||||
@@ -156,3 +156,52 @@ class WorkerStatusType:
|
||||
active_jobs: Optional[int] = None
|
||||
supported_codecs: Optional[List[str]] = None
|
||||
gpu_available: Optional[bool] = None
|
||||
|
||||
|
||||
@strawberry.enum
|
||||
class ChunkJobStatus(Enum):
|
||||
PENDING = "pending"
|
||||
CHUNKING = "chunking"
|
||||
PROCESSING = "processing"
|
||||
COLLECTING = "collecting"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
@strawberry.type
|
||||
class ChunkJobType:
|
||||
"""A chunk pipeline job."""
|
||||
|
||||
id: Optional[UUID] = None
|
||||
source_asset_id: Optional[UUID] = None
|
||||
chunk_duration: Optional[float] = None
|
||||
num_workers: Optional[int] = None
|
||||
max_retries: Optional[int] = None
|
||||
processor_type: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
progress: Optional[float] = None
|
||||
total_chunks: Optional[int] = None
|
||||
processed_chunks: Optional[int] = None
|
||||
failed_chunks: Optional[int] = None
|
||||
retry_count: Optional[int] = None
|
||||
error_message: Optional[str] = None
|
||||
throughput_mbps: Optional[float] = None
|
||||
elapsed_seconds: Optional[float] = None
|
||||
celery_task_id: Optional[str] = None
|
||||
priority: Optional[int] = None
|
||||
created_at: Optional[datetime] = None
|
||||
started_at: Optional[datetime] = None
|
||||
completed_at: Optional[datetime] = None
|
||||
|
||||
|
||||
@strawberry.input
|
||||
class CreateChunkJobInput:
|
||||
"""Request body for creating a chunk pipeline job."""
|
||||
|
||||
source_asset_id: UUID
|
||||
chunk_duration: float = 10.0
|
||||
num_workers: int = 4
|
||||
max_retries: int = 3
|
||||
processor_type: str = "ffmpeg"
|
||||
priority: int = 0
|
||||
|
||||
64
core/chunker/__init__.py
Normal file
64
core/chunker/__init__.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""
|
||||
Chunker pipeline — splits files into chunks, processes concurrently, reassembles in order.
|
||||
|
||||
Public API:
|
||||
Pipeline — orchestrates the full pipeline
|
||||
PipelineResult — aggregate result dataclass
|
||||
Chunker — file → Chunk generator
|
||||
ChunkQueue — bounded thread-safe queue
|
||||
WorkerPool — manages N worker threads
|
||||
ResultCollector — heapq-based ordered reassembly
|
||||
"""
|
||||
|
||||
from .chunker import Chunker
|
||||
from .collector import ResultCollector
|
||||
from .exceptions import (
|
||||
ChunkChecksumError,
|
||||
ChunkError,
|
||||
ChunkReadError,
|
||||
PipelineError,
|
||||
ProcessingError,
|
||||
ProcessorFailureError,
|
||||
ProcessorTimeoutError,
|
||||
ReassemblyError,
|
||||
)
|
||||
from .models import Chunk, ChunkResult, PipelineResult
|
||||
from .pipeline import Pipeline
|
||||
from .pool import WorkerPool
|
||||
from .processor import (
|
||||
ChecksumProcessor,
|
||||
CompositeProcessor,
|
||||
FFmpegExtractProcessor,
|
||||
Processor,
|
||||
SimulatedDecodeProcessor,
|
||||
)
|
||||
from .queue import ChunkQueue
|
||||
|
||||
__all__ = [
|
||||
# Core
|
||||
"Pipeline",
|
||||
"PipelineResult",
|
||||
# Components
|
||||
"Chunker",
|
||||
"ChunkQueue",
|
||||
"WorkerPool",
|
||||
"ResultCollector",
|
||||
# Models
|
||||
"Chunk",
|
||||
"ChunkResult",
|
||||
# Processors
|
||||
"Processor",
|
||||
"ChecksumProcessor",
|
||||
"SimulatedDecodeProcessor",
|
||||
"CompositeProcessor",
|
||||
"FFmpegExtractProcessor",
|
||||
# Exceptions
|
||||
"PipelineError",
|
||||
"ChunkError",
|
||||
"ChunkReadError",
|
||||
"ChunkChecksumError",
|
||||
"ProcessingError",
|
||||
"ProcessorFailureError",
|
||||
"ProcessorTimeoutError",
|
||||
"ReassemblyError",
|
||||
]
|
||||
86
core/chunker/chunker.py
Normal file
86
core/chunker/chunker.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""
|
||||
Chunker — probes a media file and yields time-based Chunk objects.
|
||||
|
||||
Demonstrates:
|
||||
- Function parameters and defaults (Interview Topic 1)
|
||||
- List comprehensions and efficient iteration / generators (Interview Topic 3)
|
||||
"""
|
||||
|
||||
import math
|
||||
import os
|
||||
from typing import Generator
|
||||
|
||||
from core.ffmpeg.probe import probe_file
|
||||
|
||||
from .exceptions import ChunkReadError
|
||||
from .models import Chunk
|
||||
|
||||
|
||||
class Chunker:
|
||||
"""
|
||||
Splits a media file into time-based chunks via a generator.
|
||||
|
||||
Uses FFmpeg probe to get duration, then yields Chunk objects
|
||||
representing time segments (no data read — extraction happens in the processor).
|
||||
|
||||
Args:
|
||||
file_path: Path to the source media file
|
||||
chunk_duration: Duration of each chunk in seconds (default: 10.0)
|
||||
"""
|
||||
|
||||
def __init__(self, file_path: str, chunk_duration: float = 10.0):
|
||||
if not os.path.isfile(file_path):
|
||||
raise ChunkReadError(f"File not found: {file_path}")
|
||||
if chunk_duration <= 0:
|
||||
raise ValueError("chunk_duration must be positive")
|
||||
|
||||
self.file_path = file_path
|
||||
self.chunk_duration = chunk_duration
|
||||
self.file_size = os.path.getsize(file_path)
|
||||
self.source_duration = self._probe_duration()
|
||||
|
||||
def _probe_duration(self) -> float:
|
||||
"""Get source file duration via FFmpeg probe."""
|
||||
try:
|
||||
result = probe_file(self.file_path)
|
||||
if result.duration is None or result.duration <= 0:
|
||||
raise ChunkReadError(
|
||||
f"Cannot determine duration for {self.file_path}"
|
||||
)
|
||||
return result.duration
|
||||
except ChunkReadError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise ChunkReadError(
|
||||
f"Failed to probe {self.file_path}: {e}"
|
||||
) from e
|
||||
|
||||
@property
|
||||
def expected_chunks(self) -> int:
|
||||
"""Calculate expected number of chunks (last chunk may be shorter)."""
|
||||
if self.source_duration <= 0:
|
||||
return 0
|
||||
return math.ceil(self.source_duration / self.chunk_duration)
|
||||
|
||||
def chunks(self) -> Generator[Chunk, None, None]:
|
||||
"""
|
||||
Yield Chunk objects representing time segments of the source file.
|
||||
|
||||
Generator-based: chunks are yielded on demand.
|
||||
Each chunk defines a time range — actual extraction is done by the processor.
|
||||
"""
|
||||
total = self.expected_chunks
|
||||
for sequence in range(total):
|
||||
start_time = sequence * self.chunk_duration
|
||||
end_time = min(
|
||||
start_time + self.chunk_duration, self.source_duration
|
||||
)
|
||||
duration = end_time - start_time
|
||||
|
||||
yield Chunk(
|
||||
sequence=sequence,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
source_path=self.file_path,
|
||||
duration=duration,
|
||||
)
|
||||
98
core/chunker/collector.py
Normal file
98
core/chunker/collector.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
ResultCollector — reassembles chunk results in sequence order using a min-heap.
|
||||
|
||||
Demonstrates:
|
||||
- Algorithms and sorting (Interview Topic 6) — heapq for ordered reassembly
|
||||
- Core data structures (Interview Topic 5) — heap, deque
|
||||
"""
|
||||
|
||||
import heapq
|
||||
from collections import deque
|
||||
from typing import List
|
||||
|
||||
from .exceptions import ReassemblyError
|
||||
from .models import ChunkResult
|
||||
|
||||
|
||||
class ResultCollector:
|
||||
"""
|
||||
Receives ChunkResults out of order, emits them in sequence order.
|
||||
|
||||
Uses a min-heap keyed on sequence number. Only emits a chunk when
|
||||
all prior sequences have been accounted for.
|
||||
|
||||
Args:
|
||||
total_chunks: Expected total number of chunks
|
||||
"""
|
||||
|
||||
def __init__(self, total_chunks: int):
|
||||
self.total_chunks = total_chunks
|
||||
self._heap: List[tuple[int, ChunkResult]] = []
|
||||
self._next_sequence = 0
|
||||
self._emitted: List[ChunkResult] = []
|
||||
self._seen_sequences: set[int] = set()
|
||||
# Sliding window for throughput calculation
|
||||
self._recent_times: deque[float] = deque(maxlen=50)
|
||||
|
||||
def add(self, result: ChunkResult) -> List[ChunkResult]:
|
||||
"""
|
||||
Add a result and return any newly emittable results in order.
|
||||
|
||||
Args:
|
||||
result: A ChunkResult (may arrive out of order)
|
||||
|
||||
Returns:
|
||||
List of results that can now be emitted in sequence order
|
||||
(may be empty if we're still waiting for earlier sequences)
|
||||
|
||||
Raises:
|
||||
ReassemblyError: If a duplicate sequence is received
|
||||
"""
|
||||
if result.sequence in self._seen_sequences:
|
||||
raise ReassemblyError(
|
||||
f"Duplicate sequence number: {result.sequence}"
|
||||
)
|
||||
self._seen_sequences.add(result.sequence)
|
||||
|
||||
# Track processing time for throughput
|
||||
if result.processing_time > 0:
|
||||
self._recent_times.append(result.processing_time)
|
||||
|
||||
# Push to min-heap
|
||||
heapq.heappush(self._heap, (result.sequence, result))
|
||||
|
||||
# Emit all consecutive results starting from _next_sequence
|
||||
newly_emitted = []
|
||||
while self._heap and self._heap[0][0] == self._next_sequence:
|
||||
_, emitted_result = heapq.heappop(self._heap)
|
||||
self._emitted.append(emitted_result)
|
||||
newly_emitted.append(emitted_result)
|
||||
self._next_sequence += 1
|
||||
|
||||
return newly_emitted
|
||||
|
||||
@property
|
||||
def is_complete(self) -> bool:
|
||||
"""True if all expected chunks have been emitted in order."""
|
||||
return self._next_sequence == self.total_chunks
|
||||
|
||||
@property
|
||||
def buffered_count(self) -> int:
|
||||
"""Number of results waiting in the heap (arrived out of order)."""
|
||||
return len(self._heap)
|
||||
|
||||
@property
|
||||
def emitted_count(self) -> int:
|
||||
"""Number of results emitted in sequence order."""
|
||||
return len(self._emitted)
|
||||
|
||||
@property
|
||||
def avg_processing_time(self) -> float:
|
||||
"""Average processing time from recent results (sliding window)."""
|
||||
if not self._recent_times:
|
||||
return 0.0
|
||||
return sum(self._recent_times) / len(self._recent_times)
|
||||
|
||||
def get_ordered_results(self) -> List[ChunkResult]:
|
||||
"""Get all emitted results in sequence order."""
|
||||
return list(self._emitted)
|
||||
64
core/chunker/exceptions.py
Normal file
64
core/chunker/exceptions.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""
|
||||
Chunker exception hierarchy.
|
||||
|
||||
Demonstrates: Managing exceptions and writing resilient code (Interview Topic 7).
|
||||
"""
|
||||
|
||||
|
||||
class PipelineError(Exception):
|
||||
"""Base exception for all chunker pipeline errors."""
|
||||
pass
|
||||
|
||||
|
||||
class ChunkError(PipelineError):
|
||||
"""Errors related to chunk creation or validation."""
|
||||
pass
|
||||
|
||||
|
||||
class ChunkReadError(ChunkError):
|
||||
"""Failed to read chunk data from source file."""
|
||||
pass
|
||||
|
||||
|
||||
class ChunkChecksumError(ChunkError):
|
||||
"""Chunk data integrity validation failed."""
|
||||
|
||||
def __init__(self, sequence: int, expected: str, actual: str):
|
||||
self.sequence = sequence
|
||||
self.expected = expected
|
||||
self.actual = actual
|
||||
super().__init__(
|
||||
f"Chunk {sequence}: checksum mismatch "
|
||||
f"(expected={expected}, actual={actual})"
|
||||
)
|
||||
|
||||
|
||||
class ProcessingError(PipelineError):
|
||||
"""Errors during chunk processing by workers."""
|
||||
pass
|
||||
|
||||
|
||||
class ProcessorTimeoutError(ProcessingError):
|
||||
"""Processor exceeded allowed time for a chunk."""
|
||||
|
||||
def __init__(self, sequence: int, timeout: float):
|
||||
self.sequence = sequence
|
||||
self.timeout = timeout
|
||||
super().__init__(f"Chunk {sequence}: processor timed out after {timeout}s")
|
||||
|
||||
|
||||
class ProcessorFailureError(ProcessingError):
|
||||
"""Processor failed to process a chunk after all retries."""
|
||||
|
||||
def __init__(self, sequence: int, retries: int, original_error: Exception):
|
||||
self.sequence = sequence
|
||||
self.retries = retries
|
||||
self.original_error = original_error
|
||||
super().__init__(
|
||||
f"Chunk {sequence}: failed after {retries} retries — {original_error}"
|
||||
)
|
||||
|
||||
|
||||
class ReassemblyError(PipelineError):
|
||||
"""Errors during result collection and ordering."""
|
||||
pass
|
||||
54
core/chunker/models.py
Normal file
54
core/chunker/models.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Internal data models for the chunker pipeline.
|
||||
|
||||
These are pipeline-internal dataclasses, not schema models.
|
||||
Schema-level ChunkJob is in core/schema/models/jobs.py.
|
||||
|
||||
Demonstrates: Core data structures (Interview Topic 5).
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Chunk:
|
||||
"""A time-based segment of the source media file."""
|
||||
|
||||
sequence: int
|
||||
start_time: float # seconds
|
||||
end_time: float # seconds
|
||||
source_path: str # path to source file
|
||||
duration: float # end_time - start_time
|
||||
checksum: str = "" # computed after extraction
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkResult:
|
||||
"""Result of processing a single chunk."""
|
||||
|
||||
sequence: int
|
||||
success: bool
|
||||
checksum_valid: bool = True
|
||||
processing_time: float = 0.0
|
||||
error: Optional[str] = None
|
||||
retries: int = 0
|
||||
worker_id: Optional[str] = None
|
||||
output_file: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineResult:
|
||||
"""Aggregate result of the entire pipeline run."""
|
||||
|
||||
total_chunks: int = 0
|
||||
processed: int = 0
|
||||
failed: int = 0
|
||||
retries: int = 0
|
||||
elapsed_time: float = 0.0
|
||||
throughput_mbps: float = 0.0
|
||||
worker_stats: Dict[str, Any] = field(default_factory=dict)
|
||||
errors: List[str] = field(default_factory=list)
|
||||
chunks_in_order: bool = True
|
||||
output_dir: Optional[str] = None
|
||||
chunk_files: List[str] = field(default_factory=list)
|
||||
244
core/chunker/pipeline.py
Normal file
244
core/chunker/pipeline.py
Normal file
@@ -0,0 +1,244 @@
|
||||
"""
|
||||
Pipeline — orchestrates the entire chunker pipeline.
|
||||
|
||||
Wires: Chunker → ChunkQueue → WorkerPool → ResultCollector → PipelineResult
|
||||
|
||||
Demonstrates:
|
||||
- Function parameters and defaults (Interview Topic 1) — configurable pipeline
|
||||
- Concurrency (Interview Topic 2) — producer thread + worker pool
|
||||
- OOP design (Interview Topic 4) — composition of pipeline components
|
||||
- Exception handling (Interview Topic 7) — graceful error propagation
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from .chunker import Chunker
|
||||
from .collector import ResultCollector
|
||||
from .exceptions import PipelineError
|
||||
from .models import PipelineResult
|
||||
from .pool import WorkerPool
|
||||
from .queue import ChunkQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Pipeline:
|
||||
"""
|
||||
Orchestrates the chunk processing pipeline.
|
||||
|
||||
The pipeline runs in three stages:
|
||||
1. Producer thread: Chunker probes file → pushes time-based chunks to ChunkQueue
|
||||
2. Worker pool: N workers pull from queue → extract mp4 segments → emit results
|
||||
3. Collector: ResultCollector reassembles results in sequence order
|
||||
|
||||
Args:
|
||||
source: Path to the source media file
|
||||
chunk_duration: Duration of each chunk in seconds (default: 10.0)
|
||||
num_workers: Number of concurrent worker threads (default: 4)
|
||||
max_retries: Max retry attempts per chunk (default: 3)
|
||||
processor_type: Processor to use — "ffmpeg", "checksum", "simulated_decode", "composite"
|
||||
queue_size: Max chunks buffered in queue (default: 10)
|
||||
event_callback: Optional callback for real-time events
|
||||
output_dir: Directory for output chunk files (required for "ffmpeg" processor)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
source: str,
|
||||
chunk_duration: float = 10.0,
|
||||
num_workers: int = 4,
|
||||
max_retries: int = 3,
|
||||
processor_type: str = "checksum",
|
||||
queue_size: int = 10,
|
||||
event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
|
||||
output_dir: Optional[str] = None,
|
||||
):
|
||||
self.source = source
|
||||
self.chunk_duration = chunk_duration
|
||||
self.num_workers = num_workers
|
||||
self.max_retries = max_retries
|
||||
self.processor_type = processor_type
|
||||
self.queue_size = queue_size
|
||||
self.event_callback = event_callback
|
||||
self.output_dir = output_dir
|
||||
|
||||
def _emit(self, event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""Emit an event if callback is registered."""
|
||||
if self.event_callback:
|
||||
self.event_callback(event_type, data)
|
||||
|
||||
def _produce_chunks(
|
||||
self, chunker: Chunker, chunk_queue: ChunkQueue
|
||||
) -> None:
|
||||
"""Producer thread: probe file and enqueue time-based chunks."""
|
||||
try:
|
||||
for chunk in chunker.chunks():
|
||||
chunk_queue.put(chunk, timeout=30.0)
|
||||
self._emit("chunk_queued", {
|
||||
"sequence": chunk.sequence,
|
||||
"start_time": chunk.start_time,
|
||||
"end_time": chunk.end_time,
|
||||
"duration": chunk.duration,
|
||||
"queue_size": chunk_queue.qsize(),
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Producer error: {e}")
|
||||
self._emit("producer_error", {"error": str(e)})
|
||||
finally:
|
||||
chunk_queue.close()
|
||||
|
||||
def _write_manifest(
|
||||
self, result: PipelineResult, source_duration: float
|
||||
) -> None:
|
||||
"""Write manifest.json to output_dir with segment metadata."""
|
||||
if not self.output_dir:
|
||||
return
|
||||
|
||||
manifest = {
|
||||
"source": self.source,
|
||||
"source_duration": source_duration,
|
||||
"chunk_duration": self.chunk_duration,
|
||||
"total_chunks": result.total_chunks,
|
||||
"processed": result.processed,
|
||||
"failed": result.failed,
|
||||
"elapsed_time": result.elapsed_time,
|
||||
"throughput_mbps": result.throughput_mbps,
|
||||
"segments": [
|
||||
{
|
||||
"sequence": i,
|
||||
"file": f"chunk_{i:04d}.mp4",
|
||||
"start": i * self.chunk_duration,
|
||||
"end": min(
|
||||
(i + 1) * self.chunk_duration, source_duration
|
||||
),
|
||||
}
|
||||
for i in range(result.total_chunks)
|
||||
if i < result.total_chunks
|
||||
],
|
||||
}
|
||||
|
||||
manifest_path = Path(self.output_dir) / "manifest.json"
|
||||
manifest_path.write_text(json.dumps(manifest, indent=2))
|
||||
logger.info(f"Manifest written to {manifest_path}")
|
||||
|
||||
def run(self) -> PipelineResult:
|
||||
"""
|
||||
Execute the full pipeline.
|
||||
|
||||
Returns:
|
||||
PipelineResult with aggregate stats
|
||||
|
||||
Raises:
|
||||
PipelineError: If the pipeline fails catastrophically
|
||||
"""
|
||||
start_time = time.monotonic()
|
||||
self._emit("pipeline_start", {
|
||||
"source": self.source,
|
||||
"chunk_duration": self.chunk_duration,
|
||||
"num_workers": self.num_workers,
|
||||
"processor_type": self.processor_type,
|
||||
})
|
||||
|
||||
try:
|
||||
# Stage 1: Set up chunker (probes file for duration)
|
||||
chunker = Chunker(self.source, self.chunk_duration)
|
||||
total_chunks = chunker.expected_chunks
|
||||
|
||||
if total_chunks == 0:
|
||||
self._emit("pipeline_complete", {"total_chunks": 0})
|
||||
return PipelineResult(chunks_in_order=True)
|
||||
|
||||
self._emit("pipeline_info", {
|
||||
"file_size": chunker.file_size,
|
||||
"source_duration": chunker.source_duration,
|
||||
"total_chunks": total_chunks,
|
||||
})
|
||||
|
||||
# Stage 2: Set up queue and worker pool
|
||||
chunk_queue = ChunkQueue(maxsize=self.queue_size)
|
||||
pool = WorkerPool(
|
||||
num_workers=self.num_workers,
|
||||
chunk_queue=chunk_queue,
|
||||
processor_type=self.processor_type,
|
||||
max_retries=self.max_retries,
|
||||
event_callback=self.event_callback,
|
||||
output_dir=self.output_dir,
|
||||
)
|
||||
|
||||
# Stage 3: Start workers, then produce chunks
|
||||
pool.start()
|
||||
|
||||
producer = threading.Thread(
|
||||
target=self._produce_chunks,
|
||||
args=(chunker, chunk_queue),
|
||||
name="chunk-producer",
|
||||
daemon=True,
|
||||
)
|
||||
producer.start()
|
||||
|
||||
# Stage 4: Wait for all workers to finish
|
||||
all_results = pool.wait()
|
||||
producer.join(timeout=5.0)
|
||||
|
||||
# Stage 5: Collect results in order
|
||||
collector = ResultCollector(total_chunks)
|
||||
for r in all_results:
|
||||
collector.add(r)
|
||||
self._emit("chunk_collected", {
|
||||
"sequence": r.sequence,
|
||||
"success": r.success,
|
||||
"buffered": collector.buffered_count,
|
||||
"emitted": collector.emitted_count,
|
||||
})
|
||||
|
||||
# Build result
|
||||
elapsed = time.monotonic() - start_time
|
||||
file_size_mb = chunker.file_size / (1024 * 1024)
|
||||
throughput = file_size_mb / elapsed if elapsed > 0 else 0.0
|
||||
|
||||
failed_results = [r for r in all_results if not r.success]
|
||||
total_retries = sum(r.retries for r in all_results)
|
||||
chunk_files = [
|
||||
r.output_file for r in all_results
|
||||
if r.success and r.output_file
|
||||
]
|
||||
|
||||
result = PipelineResult(
|
||||
total_chunks=total_chunks,
|
||||
processed=len(all_results),
|
||||
failed=len(failed_results),
|
||||
retries=total_retries,
|
||||
elapsed_time=elapsed,
|
||||
throughput_mbps=throughput,
|
||||
worker_stats=pool.get_worker_stats(),
|
||||
errors=[r.error for r in failed_results if r.error],
|
||||
chunks_in_order=collector.is_complete,
|
||||
output_dir=self.output_dir,
|
||||
chunk_files=chunk_files,
|
||||
)
|
||||
|
||||
# Write manifest if output_dir is set
|
||||
self._write_manifest(result, chunker.source_duration)
|
||||
|
||||
pool.shutdown()
|
||||
|
||||
self._emit("pipeline_complete", {
|
||||
"total_chunks": result.total_chunks,
|
||||
"processed": result.processed,
|
||||
"failed": result.failed,
|
||||
"elapsed": result.elapsed_time,
|
||||
"throughput_mbps": result.throughput_mbps,
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
except PipelineError:
|
||||
raise
|
||||
except Exception as e:
|
||||
self._emit("pipeline_error", {"error": str(e)})
|
||||
raise PipelineError(f"Pipeline failed: {e}") from e
|
||||
125
core/chunker/pool.py
Normal file
125
core/chunker/pool.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""
|
||||
WorkerPool — manages N worker threads via ThreadPoolExecutor.
|
||||
|
||||
Demonstrates: Python concurrency — threading (Interview Topic 2).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from .models import ChunkResult
|
||||
from .processor import (
|
||||
ChecksumProcessor,
|
||||
CompositeProcessor,
|
||||
FFmpegExtractProcessor,
|
||||
Processor,
|
||||
SimulatedDecodeProcessor,
|
||||
)
|
||||
from .queue import ChunkQueue
|
||||
from .worker import Worker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_processor(
|
||||
processor_type: str = "checksum",
|
||||
output_dir: Optional[str] = None,
|
||||
) -> Processor:
|
||||
"""Factory for processor instances."""
|
||||
if processor_type == "ffmpeg":
|
||||
if not output_dir:
|
||||
raise ValueError("output_dir required for ffmpeg processor")
|
||||
return FFmpegExtractProcessor(output_dir=output_dir)
|
||||
elif processor_type == "checksum":
|
||||
return ChecksumProcessor()
|
||||
elif processor_type == "simulated_decode":
|
||||
return SimulatedDecodeProcessor()
|
||||
elif processor_type == "composite":
|
||||
return CompositeProcessor([
|
||||
ChecksumProcessor(),
|
||||
SimulatedDecodeProcessor(ms_per_second=50.0),
|
||||
])
|
||||
else:
|
||||
raise ValueError(f"Unknown processor type: {processor_type}")
|
||||
|
||||
|
||||
class WorkerPool:
|
||||
"""
|
||||
Manages N worker threads that process chunks concurrently.
|
||||
|
||||
Args:
|
||||
num_workers: Number of concurrent worker threads (default: 4)
|
||||
chunk_queue: Shared queue to pull chunks from
|
||||
processor_type: Type of processor for each worker (default: "checksum")
|
||||
max_retries: Max retry attempts per chunk (default: 3)
|
||||
event_callback: Optional callback for real-time events
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
num_workers: int = 4,
|
||||
chunk_queue: Optional[ChunkQueue] = None,
|
||||
processor_type: str = "checksum",
|
||||
max_retries: int = 3,
|
||||
event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
|
||||
output_dir: Optional[str] = None,
|
||||
):
|
||||
self.num_workers = num_workers
|
||||
self.chunk_queue = chunk_queue or ChunkQueue()
|
||||
self.processor_type = processor_type
|
||||
self.max_retries = max_retries
|
||||
self.event_callback = event_callback
|
||||
self.output_dir = output_dir
|
||||
self.shutdown_event = threading.Event()
|
||||
self._executor: Optional[ThreadPoolExecutor] = None
|
||||
self._futures: List[Future] = []
|
||||
self._workers: List[Worker] = []
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start all worker threads."""
|
||||
self._executor = ThreadPoolExecutor(
|
||||
max_workers=self.num_workers,
|
||||
thread_name_prefix="chunk-worker",
|
||||
)
|
||||
|
||||
for i in range(self.num_workers):
|
||||
worker = Worker(
|
||||
worker_id=f"worker-{i}",
|
||||
chunk_queue=self.chunk_queue,
|
||||
processor=create_processor(self.processor_type, output_dir=self.output_dir),
|
||||
max_retries=self.max_retries,
|
||||
event_callback=self.event_callback,
|
||||
)
|
||||
self._workers.append(worker)
|
||||
future = self._executor.submit(worker.run)
|
||||
self._futures.append(future)
|
||||
|
||||
logger.info(f"WorkerPool started with {self.num_workers} workers")
|
||||
|
||||
def wait(self) -> List[ChunkResult]:
|
||||
"""Wait for all workers to finish and collect results."""
|
||||
all_results = []
|
||||
for future in self._futures:
|
||||
results = future.result()
|
||||
all_results.extend(results)
|
||||
return all_results
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""Signal shutdown and cleanup."""
|
||||
self.shutdown_event.set()
|
||||
self.chunk_queue.close()
|
||||
if self._executor:
|
||||
self._executor.shutdown(wait=True)
|
||||
|
||||
def get_worker_stats(self) -> Dict[str, Any]:
|
||||
"""Get per-worker statistics."""
|
||||
return {
|
||||
w.worker_id: {
|
||||
"processed": w.processed_count,
|
||||
"errors": w.error_count,
|
||||
"retries": w.retry_count,
|
||||
}
|
||||
for w in self._workers
|
||||
}
|
||||
173
core/chunker/processor.py
Normal file
173
core/chunker/processor.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
Processor ABC and concrete implementations.
|
||||
|
||||
Demonstrates: OOP design principles — ABC, inheritance, composition (Interview Topic 4).
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from .exceptions import ChunkChecksumError
|
||||
from .models import Chunk, ChunkResult
|
||||
|
||||
|
||||
class Processor(ABC):
|
||||
"""
|
||||
Abstract base class for chunk processors.
|
||||
|
||||
Each processor defines how a single chunk is processed.
|
||||
The Worker calls processor.process(chunk) and handles retries.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def process(self, chunk: Chunk) -> ChunkResult:
|
||||
"""Process a single chunk and return the result."""
|
||||
pass
|
||||
|
||||
|
||||
class FFmpegExtractProcessor(Processor):
|
||||
"""
|
||||
Extracts a time segment from the source file using FFmpeg stream copy.
|
||||
|
||||
Produces a playable mp4 file per chunk — no re-encoding.
|
||||
|
||||
Args:
|
||||
output_dir: Directory to write chunk mp4 files
|
||||
"""
|
||||
|
||||
def __init__(self, output_dir: str):
|
||||
self.output_dir = output_dir
|
||||
Path(output_dir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def process(self, chunk: Chunk) -> ChunkResult:
|
||||
from core.ffmpeg.transcode import TranscodeConfig, transcode
|
||||
|
||||
start = time.monotonic()
|
||||
|
||||
output_file = str(
|
||||
Path(self.output_dir) / f"chunk_{chunk.sequence:04d}.mp4"
|
||||
)
|
||||
|
||||
config = TranscodeConfig(
|
||||
input_path=chunk.source_path,
|
||||
output_path=output_file,
|
||||
video_codec="copy",
|
||||
audio_codec="copy",
|
||||
trim_start=chunk.start_time,
|
||||
trim_end=chunk.end_time,
|
||||
)
|
||||
|
||||
transcode(config)
|
||||
|
||||
# Compute checksum of output file
|
||||
md5 = hashlib.md5()
|
||||
with open(output_file, "rb") as f:
|
||||
for block in iter(lambda: f.read(8192), b""):
|
||||
md5.update(block)
|
||||
checksum = md5.hexdigest()
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
return ChunkResult(
|
||||
sequence=chunk.sequence,
|
||||
success=True,
|
||||
checksum_valid=True,
|
||||
processing_time=elapsed,
|
||||
output_file=output_file,
|
||||
)
|
||||
|
||||
|
||||
class ChecksumProcessor(Processor):
|
||||
"""
|
||||
Validates chunk metadata consistency.
|
||||
|
||||
For time-based chunks, verifies the time range is valid.
|
||||
Raises ChunkChecksumError on invalid ranges.
|
||||
"""
|
||||
|
||||
def process(self, chunk: Chunk) -> ChunkResult:
|
||||
start = time.monotonic()
|
||||
|
||||
valid = chunk.duration > 0 and chunk.end_time > chunk.start_time
|
||||
|
||||
if not valid:
|
||||
raise ChunkChecksumError(
|
||||
sequence=chunk.sequence,
|
||||
expected="valid time range",
|
||||
actual=f"{chunk.start_time}-{chunk.end_time}",
|
||||
)
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
return ChunkResult(
|
||||
sequence=chunk.sequence,
|
||||
success=True,
|
||||
checksum_valid=True,
|
||||
processing_time=elapsed,
|
||||
)
|
||||
|
||||
|
||||
class SimulatedDecodeProcessor(Processor):
|
||||
"""
|
||||
Simulates decode work by sleeping proportional to chunk duration.
|
||||
|
||||
Useful for demonstrating concurrency behavior without real FFmpeg.
|
||||
|
||||
Args:
|
||||
ms_per_second: Milliseconds of simulated work per second of chunk duration (default: 100)
|
||||
"""
|
||||
|
||||
def __init__(self, ms_per_second: float = 100.0):
|
||||
self.ms_per_second = ms_per_second
|
||||
|
||||
def process(self, chunk: Chunk) -> ChunkResult:
|
||||
start = time.monotonic()
|
||||
|
||||
sleep_time = (self.ms_per_second * chunk.duration) / 1000.0
|
||||
time.sleep(sleep_time)
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
return ChunkResult(
|
||||
sequence=chunk.sequence,
|
||||
success=True,
|
||||
checksum_valid=True,
|
||||
processing_time=elapsed,
|
||||
)
|
||||
|
||||
|
||||
class CompositeProcessor(Processor):
|
||||
"""
|
||||
Chains multiple processors — runs each in sequence on the same chunk.
|
||||
|
||||
Demonstrates OOP composition pattern.
|
||||
|
||||
Args:
|
||||
processors: List of processors to chain
|
||||
"""
|
||||
|
||||
def __init__(self, processors: List[Processor]):
|
||||
if not processors:
|
||||
raise ValueError("CompositeProcessor requires at least one processor")
|
||||
self.processors = processors
|
||||
|
||||
def process(self, chunk: Chunk) -> ChunkResult:
|
||||
start = time.monotonic()
|
||||
last_result = None
|
||||
|
||||
for proc in self.processors:
|
||||
last_result = proc.process(chunk)
|
||||
if not last_result.success:
|
||||
return last_result
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
return ChunkResult(
|
||||
sequence=chunk.sequence,
|
||||
success=True,
|
||||
checksum_valid=last_result.checksum_valid if last_result else True,
|
||||
processing_time=elapsed,
|
||||
)
|
||||
76
core/chunker/queue.py
Normal file
76
core/chunker/queue.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""
|
||||
ChunkQueue — bounded, thread-safe queue with sentinel-based shutdown.
|
||||
|
||||
Demonstrates: Core data structures — queue.Queue (Interview Topic 5).
|
||||
"""
|
||||
|
||||
import queue
|
||||
from typing import Optional
|
||||
|
||||
from .models import Chunk
|
||||
|
||||
# Sentinel value to signal workers to stop
|
||||
_SENTINEL = object()
|
||||
|
||||
|
||||
class ChunkQueue:
|
||||
"""
|
||||
Thread-safe bounded queue for chunks.
|
||||
|
||||
Provides backpressure: producers block when the queue is full,
|
||||
preventing unbounded memory usage.
|
||||
|
||||
Args:
|
||||
maxsize: Maximum number of chunks in the queue (default: 10)
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int = 10):
|
||||
self._queue: queue.Queue = queue.Queue(maxsize=maxsize)
|
||||
self._closed = False
|
||||
self.maxsize = maxsize
|
||||
|
||||
def put(self, chunk: Chunk, timeout: Optional[float] = None) -> None:
|
||||
"""
|
||||
Add a chunk to the queue. Blocks if full (backpressure).
|
||||
|
||||
Args:
|
||||
chunk: The chunk to enqueue
|
||||
timeout: Max seconds to wait (None = block forever)
|
||||
|
||||
Raises:
|
||||
queue.Full: If timeout expires while queue is full
|
||||
"""
|
||||
self._queue.put(chunk, timeout=timeout)
|
||||
|
||||
def get(self, timeout: Optional[float] = None) -> Optional[Chunk]:
|
||||
"""
|
||||
Get next chunk from queue. Returns None if queue is closed.
|
||||
|
||||
Args:
|
||||
timeout: Max seconds to wait (None = block forever)
|
||||
|
||||
Returns:
|
||||
Chunk or None (if sentinel received, meaning queue is closed)
|
||||
|
||||
Raises:
|
||||
queue.Empty: If timeout expires while queue is empty
|
||||
"""
|
||||
item = self._queue.get(timeout=timeout)
|
||||
if item is _SENTINEL:
|
||||
# Re-put sentinel so other workers also see it
|
||||
self._queue.put(_SENTINEL)
|
||||
return None
|
||||
return item
|
||||
|
||||
def close(self) -> None:
|
||||
"""Signal all consumers to stop by inserting a sentinel."""
|
||||
self._closed = True
|
||||
self._queue.put(_SENTINEL)
|
||||
|
||||
@property
|
||||
def is_closed(self) -> bool:
|
||||
return self._closed
|
||||
|
||||
def qsize(self) -> int:
|
||||
"""Current number of items in the queue (approximate)."""
|
||||
return self._queue.qsize()
|
||||
141
core/chunker/worker.py
Normal file
141
core/chunker/worker.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""
|
||||
Worker — pulls chunks from queue, processes with retry logic.
|
||||
|
||||
Demonstrates:
|
||||
- Exception handling and resilient code (Interview Topic 7)
|
||||
- Concurrency (Interview Topic 2) — workers run in thread pool
|
||||
"""
|
||||
|
||||
import logging
|
||||
import queue
|
||||
import time
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from .exceptions import ProcessorFailureError
|
||||
from .models import Chunk, ChunkResult
|
||||
from .processor import Processor
|
||||
from .queue import ChunkQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Worker:
|
||||
"""
|
||||
Processes chunks from a queue with retry and exponential backoff.
|
||||
|
||||
Args:
|
||||
worker_id: Identifier for this worker (e.g. "worker-0")
|
||||
chunk_queue: Source queue to pull chunks from
|
||||
processor: Processor instance to use
|
||||
max_retries: Maximum retry attempts per chunk (default: 3)
|
||||
event_callback: Optional callback for real-time status updates
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
worker_id: str,
|
||||
chunk_queue: ChunkQueue,
|
||||
processor: Processor,
|
||||
max_retries: int = 3,
|
||||
event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
|
||||
):
|
||||
self.worker_id = worker_id
|
||||
self.chunk_queue = chunk_queue
|
||||
self.processor = processor
|
||||
self.max_retries = max_retries
|
||||
self.event_callback = event_callback
|
||||
self.processed_count = 0
|
||||
self.error_count = 0
|
||||
self.retry_count = 0
|
||||
|
||||
def _emit(self, event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""Emit an event if callback is registered."""
|
||||
if self.event_callback:
|
||||
self.event_callback(event_type, {"worker_id": self.worker_id, **data})
|
||||
|
||||
def _process_with_retry(self, chunk: Chunk) -> ChunkResult:
|
||||
"""
|
||||
Process a chunk with exponential backoff retry.
|
||||
|
||||
Retry delays: 0.1s, 0.2s, 0.4s, ... (doubles each attempt)
|
||||
"""
|
||||
last_error = None
|
||||
|
||||
for attempt in range(self.max_retries + 1):
|
||||
try:
|
||||
if attempt > 0:
|
||||
backoff = 0.1 * (2 ** (attempt - 1))
|
||||
self._emit("chunk_retry", {
|
||||
"sequence": chunk.sequence,
|
||||
"attempt": attempt,
|
||||
"backoff": backoff,
|
||||
})
|
||||
time.sleep(backoff)
|
||||
self.retry_count += 1
|
||||
|
||||
result = self.processor.process(chunk)
|
||||
result.retries = attempt
|
||||
result.worker_id = self.worker_id
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
logger.warning(
|
||||
f"{self.worker_id}: chunk {chunk.sequence} "
|
||||
f"attempt {attempt + 1}/{self.max_retries + 1} failed: {e}"
|
||||
)
|
||||
|
||||
# All retries exhausted
|
||||
self.error_count += 1
|
||||
self._emit("chunk_error", {
|
||||
"sequence": chunk.sequence,
|
||||
"error": str(last_error),
|
||||
"retries": self.max_retries,
|
||||
})
|
||||
|
||||
return ChunkResult(
|
||||
sequence=chunk.sequence,
|
||||
success=False,
|
||||
processing_time=0.0,
|
||||
error=str(last_error),
|
||||
retries=self.max_retries,
|
||||
worker_id=self.worker_id,
|
||||
)
|
||||
|
||||
def run(self) -> list[ChunkResult]:
|
||||
"""
|
||||
Main worker loop — pull chunks and process until queue is closed.
|
||||
|
||||
Returns:
|
||||
List of ChunkResults processed by this worker
|
||||
"""
|
||||
results = []
|
||||
self._emit("worker_status", {"state": "idle"})
|
||||
|
||||
while True:
|
||||
try:
|
||||
chunk = self.chunk_queue.get(timeout=1.0)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
if chunk is None: # Sentinel received
|
||||
break
|
||||
|
||||
self._emit("chunk_processing", {
|
||||
"sequence": chunk.sequence,
|
||||
"state": "processing",
|
||||
})
|
||||
|
||||
result = self._process_with_retry(chunk)
|
||||
results.append(result)
|
||||
self.processed_count += 1
|
||||
|
||||
self._emit("chunk_done", {
|
||||
"sequence": chunk.sequence,
|
||||
"success": result.success,
|
||||
"processing_time": result.processing_time,
|
||||
"retries": result.retries,
|
||||
})
|
||||
|
||||
self._emit("worker_status", {"state": "stopped"})
|
||||
return results
|
||||
15
core/jobs/__init__.py
Normal file
15
core/jobs/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""
|
||||
MPR Jobs Module
|
||||
|
||||
Provides executor abstraction and task dispatch for job processing.
|
||||
"""
|
||||
|
||||
from .executor import Executor, LocalExecutor, get_executor
|
||||
from .task import run_job
|
||||
|
||||
__all__ = [
|
||||
"Executor",
|
||||
"LocalExecutor",
|
||||
"get_executor",
|
||||
"run_job",
|
||||
]
|
||||
@@ -1,17 +1,16 @@
|
||||
"""
|
||||
Executor abstraction for job processing.
|
||||
|
||||
Supports different backends:
|
||||
- LocalExecutor: FFmpeg via Celery (default)
|
||||
- LambdaExecutor: AWS Lambda (future)
|
||||
Determines WHERE jobs run:
|
||||
- LocalExecutor: delegates to registered Handler (default)
|
||||
- LambdaExecutor: AWS Step Functions
|
||||
- GCPExecutor: Google Cloud Run Jobs
|
||||
"""
|
||||
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from core.ffmpeg.transcode import TranscodeConfig, transcode
|
||||
|
||||
# Configuration from environment
|
||||
MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local")
|
||||
|
||||
@@ -22,26 +21,18 @@ class Executor(ABC):
|
||||
@abstractmethod
|
||||
def run(
|
||||
self,
|
||||
job_type: str,
|
||||
job_id: str,
|
||||
source_path: str,
|
||||
output_path: str,
|
||||
preset: Optional[Dict[str, Any]] = None,
|
||||
trim_start: Optional[float] = None,
|
||||
trim_end: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Execute a transcode/trim job.
|
||||
Execute a job.
|
||||
|
||||
Args:
|
||||
job_type: Type of job ("transcode", "chunk", etc.)
|
||||
job_id: Unique job identifier
|
||||
source_path: Path to source file
|
||||
output_path: Path for output file
|
||||
preset: Transcode preset dict (optional, None = trim only)
|
||||
trim_start: Trim start time in seconds (optional)
|
||||
trim_end: Trim end time in seconds (optional)
|
||||
duration: Source duration in seconds (for progress calculation)
|
||||
payload: Job-type-specific configuration dict
|
||||
progress_callback: Called with (percent, details_dict)
|
||||
|
||||
Returns:
|
||||
@@ -51,62 +42,25 @@ class Executor(ABC):
|
||||
|
||||
|
||||
class LocalExecutor(Executor):
|
||||
"""Execute jobs locally using FFmpeg."""
|
||||
"""Execute jobs locally using registered handlers."""
|
||||
|
||||
def run(
|
||||
self,
|
||||
job_type: str,
|
||||
job_id: str,
|
||||
source_path: str,
|
||||
output_path: str,
|
||||
preset: Optional[Dict[str, Any]] = None,
|
||||
trim_start: Optional[float] = None,
|
||||
trim_end: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> bool:
|
||||
"""Execute job using local FFmpeg."""
|
||||
"""Execute job using the appropriate local handler."""
|
||||
from .registry import get_handler
|
||||
|
||||
# Build config from preset or use stream copy for trim-only
|
||||
if preset:
|
||||
config = TranscodeConfig(
|
||||
input_path=source_path,
|
||||
output_path=output_path,
|
||||
video_codec=preset.get("video_codec", "libx264"),
|
||||
video_bitrate=preset.get("video_bitrate"),
|
||||
video_crf=preset.get("video_crf"),
|
||||
video_preset=preset.get("video_preset"),
|
||||
resolution=preset.get("resolution"),
|
||||
framerate=preset.get("framerate"),
|
||||
audio_codec=preset.get("audio_codec", "aac"),
|
||||
audio_bitrate=preset.get("audio_bitrate"),
|
||||
audio_channels=preset.get("audio_channels"),
|
||||
audio_samplerate=preset.get("audio_samplerate"),
|
||||
container=preset.get("container", "mp4"),
|
||||
extra_args=preset.get("extra_args", []),
|
||||
trim_start=trim_start,
|
||||
trim_end=trim_end,
|
||||
)
|
||||
else:
|
||||
# Trim-only: stream copy
|
||||
config = TranscodeConfig(
|
||||
input_path=source_path,
|
||||
output_path=output_path,
|
||||
video_codec="copy",
|
||||
audio_codec="copy",
|
||||
trim_start=trim_start,
|
||||
trim_end=trim_end,
|
||||
)
|
||||
|
||||
# Wrapper to convert float percent to int
|
||||
def wrapped_callback(percent: float, details: Dict[str, Any]) -> None:
|
||||
if progress_callback:
|
||||
progress_callback(int(percent), details)
|
||||
|
||||
return transcode(
|
||||
config,
|
||||
duration=duration,
|
||||
progress_callback=wrapped_callback if progress_callback else None,
|
||||
handler = get_handler(job_type)
|
||||
result = handler.process(
|
||||
job_id=job_id,
|
||||
payload=payload,
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
return result.get("status") == "completed"
|
||||
|
||||
|
||||
class LambdaExecutor(Executor):
|
||||
@@ -123,26 +77,18 @@ class LambdaExecutor(Executor):
|
||||
|
||||
def run(
|
||||
self,
|
||||
job_type: str,
|
||||
job_id: str,
|
||||
source_path: str,
|
||||
output_path: str,
|
||||
preset: Optional[Dict[str, Any]] = None,
|
||||
trim_start: Optional[float] = None,
|
||||
trim_end: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> bool:
|
||||
"""Start a Step Functions execution for this job."""
|
||||
import json
|
||||
|
||||
payload = {
|
||||
sfn_payload = {
|
||||
"job_type": job_type,
|
||||
"job_id": job_id,
|
||||
"source_key": source_path,
|
||||
"output_key": output_path,
|
||||
"preset": preset,
|
||||
"trim_start": trim_start,
|
||||
"trim_end": trim_end,
|
||||
"duration": duration,
|
||||
**payload,
|
||||
"callback_url": self.callback_url,
|
||||
"api_key": self.callback_api_key,
|
||||
}
|
||||
@@ -150,10 +96,9 @@ class LambdaExecutor(Executor):
|
||||
response = self.sfn.start_execution(
|
||||
stateMachineArn=self.state_machine_arn,
|
||||
name=f"mpr-{job_id}",
|
||||
input=json.dumps(payload),
|
||||
input=json.dumps(sfn_payload),
|
||||
)
|
||||
|
||||
# Store execution ARN on the job
|
||||
execution_arn = response["executionArn"]
|
||||
try:
|
||||
from core.db import update_job_fields
|
||||
@@ -179,13 +124,9 @@ class GCPExecutor(Executor):
|
||||
|
||||
def run(
|
||||
self,
|
||||
job_type: str,
|
||||
job_id: str,
|
||||
source_path: str,
|
||||
output_path: str,
|
||||
preset: Optional[Dict[str, Any]] = None,
|
||||
trim_start: Optional[float] = None,
|
||||
trim_end: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> bool:
|
||||
"""Trigger a Cloud Run Job execution for this job."""
|
||||
@@ -193,14 +134,10 @@ class GCPExecutor(Executor):
|
||||
|
||||
from google.cloud import run_v2
|
||||
|
||||
payload = {
|
||||
gcp_payload = {
|
||||
"job_type": job_type,
|
||||
"job_id": job_id,
|
||||
"source_key": source_path,
|
||||
"output_key": output_path,
|
||||
"preset": preset,
|
||||
"trim_start": trim_start,
|
||||
"trim_end": trim_end,
|
||||
"duration": duration,
|
||||
**payload,
|
||||
"callback_url": self.callback_url,
|
||||
"api_key": self.callback_api_key,
|
||||
}
|
||||
@@ -216,7 +153,8 @@ class GCPExecutor(Executor):
|
||||
run_v2.RunJobRequest.Overrides.ContainerOverride(
|
||||
env=[
|
||||
run_v2.EnvVar(
|
||||
name="MPR_JOB_PAYLOAD", value=json.dumps(payload)
|
||||
name="MPR_JOB_PAYLOAD",
|
||||
value=json.dumps(gcp_payload),
|
||||
)
|
||||
]
|
||||
)
|
||||
5
core/jobs/handlers/__init__.py
Normal file
5
core/jobs/handlers/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Job handlers — type-specific execution logic."""
|
||||
|
||||
from .base import Handler
|
||||
|
||||
__all__ = ["Handler"]
|
||||
33
core/jobs/handlers/base.py
Normal file
33
core/jobs/handlers/base.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Base Handler ABC — defines the interface for job-type-specific execution logic.
|
||||
|
||||
A Handler knows HOW to execute a specific kind of job (transcode, chunk, etc.).
|
||||
The Executor decides WHERE to run it (local, Lambda, GCP).
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
|
||||
class Handler(ABC):
|
||||
"""Abstract base class for job handlers."""
|
||||
|
||||
@abstractmethod
|
||||
def process(
|
||||
self,
|
||||
job_id: str,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute job-specific logic.
|
||||
|
||||
Args:
|
||||
job_id: Unique job identifier
|
||||
payload: Job-type-specific configuration
|
||||
progress_callback: Called with (percent, details_dict)
|
||||
|
||||
Returns:
|
||||
Result dict with at least {"status": "completed"} or raises
|
||||
"""
|
||||
pass
|
||||
119
core/jobs/handlers/chunk.py
Normal file
119
core/jobs/handlers/chunk.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""
|
||||
ChunkHandler — job handler that wraps the chunker Pipeline.
|
||||
|
||||
Downloads source from S3/MinIO, runs FFmpeg chunking pipeline,
|
||||
uploads mp4 segments + manifest back to S3/MinIO.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from core.chunker import Pipeline
|
||||
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
|
||||
|
||||
from .base import Handler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChunkHandler(Handler):
|
||||
"""
|
||||
Handles chunk processing jobs by delegating to the chunker Pipeline.
|
||||
|
||||
Expected payload keys:
|
||||
source_key: str — S3 key of the source file in BUCKET_IN
|
||||
chunk_duration: float — seconds per chunk (default: 10.0)
|
||||
num_workers: int — concurrent workers (default: 4)
|
||||
max_retries: int — retries per chunk (default: 3)
|
||||
processor_type: str — "ffmpeg", "checksum", "simulated_decode", "composite"
|
||||
queue_size: int — max queue depth (default: 10)
|
||||
"""
|
||||
|
||||
def process(
|
||||
self,
|
||||
job_id: str,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
source_key = payload["source_key"]
|
||||
processor_type = payload.get("processor_type", "ffmpeg")
|
||||
|
||||
logger.info(f"ChunkHandler starting job {job_id}: {source_key}")
|
||||
|
||||
# Download source from S3/MinIO
|
||||
tmp_source = download_to_temp(BUCKET_IN, source_key)
|
||||
|
||||
# Create temp output directory for chunks
|
||||
tmp_output_dir = tempfile.mkdtemp(prefix=f"chunks-{job_id}-")
|
||||
|
||||
try:
|
||||
def event_bridge(event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""Bridge pipeline events to the job progress callback."""
|
||||
if progress_callback and event_type == "pipeline_complete":
|
||||
progress_callback(100, data)
|
||||
elif progress_callback and event_type == "chunk_done":
|
||||
total = data.get("total_chunks", 1)
|
||||
if total > 0:
|
||||
pct = min(int((data.get("sequence", 0) + 1) / total * 100), 99)
|
||||
progress_callback(pct, data)
|
||||
|
||||
pipeline = Pipeline(
|
||||
source=tmp_source,
|
||||
chunk_duration=payload.get("chunk_duration", 10.0),
|
||||
num_workers=payload.get("num_workers", 4),
|
||||
max_retries=payload.get("max_retries", 3),
|
||||
processor_type=processor_type,
|
||||
queue_size=payload.get("queue_size", 10),
|
||||
event_callback=event_bridge,
|
||||
output_dir=tmp_output_dir if processor_type == "ffmpeg" else None,
|
||||
)
|
||||
|
||||
result = pipeline.run()
|
||||
|
||||
# Upload chunks + manifest to S3/MinIO
|
||||
output_prefix = f"chunks/{job_id}"
|
||||
uploaded_files = []
|
||||
|
||||
for chunk_file in result.chunk_files:
|
||||
filename = os.path.basename(chunk_file)
|
||||
output_key = f"{output_prefix}/{filename}"
|
||||
upload_file(chunk_file, BUCKET_OUT, output_key)
|
||||
uploaded_files.append(output_key)
|
||||
logger.info(f"Uploaded {output_key}")
|
||||
|
||||
# Upload manifest
|
||||
manifest_path = os.path.join(tmp_output_dir, "manifest.json")
|
||||
if os.path.exists(manifest_path):
|
||||
manifest_key = f"{output_prefix}/manifest.json"
|
||||
upload_file(manifest_path, BUCKET_OUT, manifest_key)
|
||||
uploaded_files.append(manifest_key)
|
||||
logger.info(f"Uploaded {manifest_key}")
|
||||
|
||||
return {
|
||||
"status": "completed" if result.failed == 0 else "completed_with_errors",
|
||||
"total_chunks": result.total_chunks,
|
||||
"processed": result.processed,
|
||||
"failed": result.failed,
|
||||
"retries": result.retries,
|
||||
"elapsed_time": result.elapsed_time,
|
||||
"throughput_mbps": result.throughput_mbps,
|
||||
"worker_stats": result.worker_stats,
|
||||
"errors": result.errors,
|
||||
"chunks_in_order": result.chunks_in_order,
|
||||
"output_prefix": output_prefix,
|
||||
"uploaded_files": uploaded_files,
|
||||
}
|
||||
|
||||
finally:
|
||||
# Cleanup temp files
|
||||
try:
|
||||
os.unlink(tmp_source)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
shutil.rmtree(tmp_output_dir, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
104
core/jobs/handlers/transcode.py
Normal file
104
core/jobs/handlers/transcode.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""
|
||||
TranscodeHandler — executes transcode/trim jobs using FFmpeg.
|
||||
|
||||
Extracted from the old tasks.py Celery task logic.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from core.ffmpeg.transcode import TranscodeConfig, transcode
|
||||
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
|
||||
|
||||
from .base import Handler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TranscodeHandler(Handler):
|
||||
"""Handle transcode and trim jobs via FFmpeg."""
|
||||
|
||||
def process(
|
||||
self,
|
||||
job_id: str,
|
||||
payload: Dict[str, Any],
|
||||
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
source_key = payload["source_key"]
|
||||
output_key = payload["output_key"]
|
||||
preset = payload.get("preset")
|
||||
trim_start = payload.get("trim_start")
|
||||
trim_end = payload.get("trim_end")
|
||||
duration = payload.get("duration")
|
||||
|
||||
logger.info(f"TranscodeHandler: {source_key} -> {output_key}")
|
||||
|
||||
# Download source
|
||||
tmp_source = download_to_temp(BUCKET_IN, source_key)
|
||||
|
||||
ext = Path(output_key).suffix or ".mp4"
|
||||
fd, tmp_output = tempfile.mkstemp(suffix=ext)
|
||||
os.close(fd)
|
||||
|
||||
try:
|
||||
if preset:
|
||||
config = TranscodeConfig(
|
||||
input_path=tmp_source,
|
||||
output_path=tmp_output,
|
||||
video_codec=preset.get("video_codec", "libx264"),
|
||||
video_bitrate=preset.get("video_bitrate"),
|
||||
video_crf=preset.get("video_crf"),
|
||||
video_preset=preset.get("video_preset"),
|
||||
resolution=preset.get("resolution"),
|
||||
framerate=preset.get("framerate"),
|
||||
audio_codec=preset.get("audio_codec", "aac"),
|
||||
audio_bitrate=preset.get("audio_bitrate"),
|
||||
audio_channels=preset.get("audio_channels"),
|
||||
audio_samplerate=preset.get("audio_samplerate"),
|
||||
container=preset.get("container", "mp4"),
|
||||
extra_args=preset.get("extra_args", []),
|
||||
trim_start=trim_start,
|
||||
trim_end=trim_end,
|
||||
)
|
||||
else:
|
||||
config = TranscodeConfig(
|
||||
input_path=tmp_source,
|
||||
output_path=tmp_output,
|
||||
video_codec="copy",
|
||||
audio_codec="copy",
|
||||
trim_start=trim_start,
|
||||
trim_end=trim_end,
|
||||
)
|
||||
|
||||
def wrapped_callback(percent: float, details: Dict[str, Any]) -> None:
|
||||
if progress_callback:
|
||||
progress_callback(int(percent), details)
|
||||
|
||||
success = transcode(
|
||||
config,
|
||||
duration=duration,
|
||||
progress_callback=wrapped_callback if progress_callback else None,
|
||||
)
|
||||
|
||||
if not success:
|
||||
raise RuntimeError("Transcode returned False")
|
||||
|
||||
# Upload result
|
||||
logger.info(f"Uploading {output_key} to {BUCKET_OUT}")
|
||||
upload_file(tmp_output, BUCKET_OUT, output_key)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"job_id": job_id,
|
||||
"output_key": output_key,
|
||||
}
|
||||
|
||||
finally:
|
||||
for f in [tmp_source, tmp_output]:
|
||||
try:
|
||||
os.unlink(f)
|
||||
except OSError:
|
||||
pass
|
||||
33
core/jobs/registry.py
Normal file
33
core/jobs/registry.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Handler registry — maps job_type strings to Handler classes.
|
||||
"""
|
||||
|
||||
from typing import Dict, Type
|
||||
|
||||
from .handlers.base import Handler
|
||||
|
||||
_handlers: Dict[str, Type[Handler]] = {}
|
||||
|
||||
|
||||
def register_handler(job_type: str, handler_class: Type[Handler]) -> None:
|
||||
"""Register a handler class for a job type."""
|
||||
_handlers[job_type] = handler_class
|
||||
|
||||
|
||||
def get_handler(job_type: str) -> Handler:
|
||||
"""Get an instantiated handler for a job type."""
|
||||
if job_type not in _handlers:
|
||||
raise ValueError(f"Unknown job type: {job_type}")
|
||||
return _handlers[job_type]()
|
||||
|
||||
|
||||
def _register_defaults() -> None:
|
||||
"""Register built-in handlers."""
|
||||
from .handlers.chunk import ChunkHandler
|
||||
from .handlers.transcode import TranscodeHandler
|
||||
|
||||
register_handler("transcode", TranscodeHandler)
|
||||
register_handler("chunk", ChunkHandler)
|
||||
|
||||
|
||||
_register_defaults()
|
||||
64
core/jobs/task.py
Normal file
64
core/jobs/task.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""
|
||||
Celery task for job processing.
|
||||
|
||||
Generic dispatcher — routes to the appropriate handler based on job_type.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
from core.rpc.server import update_job_progress
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
|
||||
def run_job(
|
||||
self,
|
||||
job_type: str,
|
||||
job_id: str,
|
||||
payload: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Generic Celery task — dispatches to the registered handler for job_type.
|
||||
"""
|
||||
logger.info(f"Starting {job_type} job {job_id}")
|
||||
|
||||
update_job_progress(job_id, progress=0, status="processing")
|
||||
|
||||
def progress_callback(percent: int, details: Dict[str, Any]) -> None:
|
||||
update_job_progress(
|
||||
job_id,
|
||||
progress=percent,
|
||||
current_time=details.get("time", 0.0),
|
||||
status="processing",
|
||||
)
|
||||
|
||||
try:
|
||||
from .registry import get_handler
|
||||
|
||||
handler = get_handler(job_type)
|
||||
result = handler.process(
|
||||
job_id=job_id,
|
||||
payload=payload,
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
|
||||
logger.info(f"Job {job_id} completed successfully")
|
||||
update_job_progress(job_id, progress=100, status="completed")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Job {job_id} failed: {e}")
|
||||
update_job_progress(job_id, progress=0, status="failed", error=str(e))
|
||||
|
||||
if self.request.retries < self.max_retries:
|
||||
raise self.retry(exc=e)
|
||||
|
||||
return {
|
||||
"status": "failed",
|
||||
"job_id": job_id,
|
||||
"error": str(e),
|
||||
}
|
||||
@@ -59,17 +59,24 @@ class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer):
|
||||
|
||||
# Dispatch to Celery if available
|
||||
if self.celery_app:
|
||||
from core.task.tasks import run_transcode_job
|
||||
from core.jobs.task import run_job
|
||||
|
||||
task = run_transcode_job.delay(
|
||||
job_id=job_id,
|
||||
source_path=request.source_path,
|
||||
output_path=request.output_path,
|
||||
preset=preset,
|
||||
trim_start=request.trim_start
|
||||
payload = {
|
||||
"source_key": request.source_path,
|
||||
"output_key": request.output_path,
|
||||
"preset": preset,
|
||||
"trim_start": request.trim_start
|
||||
if request.HasField("trim_start")
|
||||
else None,
|
||||
trim_end=request.trim_end if request.HasField("trim_end") else None,
|
||||
"trim_end": request.trim_end
|
||||
if request.HasField("trim_end")
|
||||
else None,
|
||||
}
|
||||
|
||||
task = run_job.delay(
|
||||
job_type="transcode",
|
||||
job_id=job_id,
|
||||
payload=payload,
|
||||
)
|
||||
_active_jobs[job_id]["celery_task_id"] = task.id
|
||||
|
||||
@@ -197,11 +204,14 @@ def update_job_progress(
|
||||
speed: float = 0.0,
|
||||
status: str = "processing",
|
||||
error: str = None,
|
||||
**extra,
|
||||
) -> None:
|
||||
"""
|
||||
Update job progress (called from worker tasks).
|
||||
|
||||
Updates both the in-memory gRPC state and the Django database.
|
||||
Extra kwargs are stored for chunker-specific fields (total_chunks,
|
||||
processed_chunks, failed_chunks, throughput_mbps, etc.).
|
||||
"""
|
||||
if job_id in _active_jobs:
|
||||
_active_jobs[job_id].update(
|
||||
@@ -212,6 +222,7 @@ def update_job_progress(
|
||||
"speed": speed,
|
||||
"status": status,
|
||||
"error": error,
|
||||
**extra,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -23,12 +23,12 @@ from .grpc import (
|
||||
ProgressUpdate,
|
||||
WorkerStatus,
|
||||
)
|
||||
from .jobs import JobStatus, TranscodeJob
|
||||
from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob
|
||||
from .media import AssetStatus, MediaAsset
|
||||
from .presets import BUILTIN_PRESETS, TranscodePreset
|
||||
|
||||
# Core domain models - generates Django, Pydantic, TypeScript
|
||||
DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob]
|
||||
DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob, ChunkJob]
|
||||
|
||||
# API request/response models - generates TypeScript only (no Django)
|
||||
# WorkerStatus from grpc.py is reused here
|
||||
@@ -42,7 +42,7 @@ API_MODELS = [
|
||||
]
|
||||
|
||||
# Status enums - included in generated code
|
||||
ENUMS = [AssetStatus, JobStatus]
|
||||
ENUMS = [AssetStatus, JobStatus, ChunkJobStatus]
|
||||
|
||||
# gRPC messages - generates Proto
|
||||
GRPC_MESSAGES = [
|
||||
@@ -61,6 +61,7 @@ __all__ = [
|
||||
"MediaAsset",
|
||||
"TranscodePreset",
|
||||
"TranscodeJob",
|
||||
"ChunkJob",
|
||||
# API Models
|
||||
"CreateJobRequest",
|
||||
"UpdateAssetRequest",
|
||||
@@ -70,6 +71,7 @@ __all__ = [
|
||||
# Enums
|
||||
"AssetStatus",
|
||||
"JobStatus",
|
||||
"ChunkJobStatus",
|
||||
# gRPC
|
||||
"GRPC_SERVICE",
|
||||
"JobRequest",
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
"""
|
||||
TranscodeJob Schema Definition
|
||||
Job Schema Definitions
|
||||
|
||||
Source of truth for job data model.
|
||||
Source of truth for job data models.
|
||||
TranscodeJob and ChunkJob share common lifecycle fields by convention.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
|
||||
@@ -77,3 +78,56 @@ class TranscodeJob:
|
||||
return self.preset_id is None and (
|
||||
self.trim_start is not None or self.trim_end is not None
|
||||
)
|
||||
|
||||
|
||||
class ChunkJobStatus(str, Enum):
|
||||
"""Status of a chunk pipeline job."""
|
||||
|
||||
PENDING = "pending"
|
||||
CHUNKING = "chunking"
|
||||
PROCESSING = "processing"
|
||||
COLLECTING = "collecting"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkJob:
|
||||
"""
|
||||
A chunk pipeline job — splits a media file into chunks and processes them
|
||||
through a concurrent worker pool.
|
||||
"""
|
||||
|
||||
id: UUID
|
||||
|
||||
# Input
|
||||
source_asset_id: UUID
|
||||
|
||||
# Configuration
|
||||
chunk_duration: float = 10.0 # seconds
|
||||
num_workers: int = 4
|
||||
max_retries: int = 3
|
||||
processor_type: str = "ffmpeg" # "ffmpeg", "checksum", "simulated_decode", "composite"
|
||||
|
||||
# Status & Progress
|
||||
status: ChunkJobStatus = ChunkJobStatus.PENDING
|
||||
progress: float = 0.0 # 0.0 to 100.0
|
||||
total_chunks: int = 0
|
||||
processed_chunks: int = 0
|
||||
failed_chunks: int = 0
|
||||
retry_count: int = 0
|
||||
error_message: Optional[str] = None
|
||||
|
||||
# Result stats
|
||||
throughput_mbps: Optional[float] = None
|
||||
elapsed_seconds: Optional[float] = None
|
||||
|
||||
# Worker tracking
|
||||
celery_task_id: Optional[str] = None
|
||||
priority: int = 0 # Lower = higher priority
|
||||
|
||||
# Timestamps
|
||||
created_at: Optional[datetime] = None
|
||||
started_at: Optional[datetime] = None
|
||||
completed_at: Optional[datetime] = None
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
"""
|
||||
MPR Worker Module
|
||||
|
||||
Provides executor abstraction and Celery tasks for job processing.
|
||||
"""
|
||||
|
||||
from .executor import Executor, LocalExecutor, get_executor
|
||||
from .tasks import run_transcode_job
|
||||
|
||||
__all__ = [
|
||||
"Executor",
|
||||
"LocalExecutor",
|
||||
"get_executor",
|
||||
"run_transcode_job",
|
||||
]
|
||||
@@ -1,105 +0,0 @@
|
||||
"""
|
||||
Celery tasks for job processing.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from celery import shared_task
|
||||
|
||||
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
|
||||
from core.rpc.server import update_job_progress
|
||||
from core.task.executor import get_executor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, queue="transcode", max_retries=3, default_retry_delay=60)
|
||||
def run_transcode_job(
|
||||
self,
|
||||
job_id: str,
|
||||
source_key: str,
|
||||
output_key: str,
|
||||
preset: Optional[Dict[str, Any]] = None,
|
||||
trim_start: Optional[float] = None,
|
||||
trim_end: Optional[float] = None,
|
||||
duration: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Celery task to run a transcode/trim job.
|
||||
|
||||
Downloads source from S3, runs FFmpeg, uploads result to S3.
|
||||
"""
|
||||
logger.info(f"Starting job {job_id}: {source_key} -> {output_key}")
|
||||
|
||||
update_job_progress(job_id, progress=0, status="processing")
|
||||
|
||||
# Download source from S3 to temp file
|
||||
logger.info(f"Downloading {source_key} from {BUCKET_IN}")
|
||||
tmp_source = download_to_temp(BUCKET_IN, source_key)
|
||||
|
||||
# Create temp output path with same extension
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
ext = Path(output_key).suffix or ".mp4"
|
||||
fd, tmp_output = tempfile.mkstemp(suffix=ext)
|
||||
os.close(fd)
|
||||
|
||||
def progress_callback(percent: int, details: Dict[str, Any]) -> None:
|
||||
update_job_progress(
|
||||
job_id,
|
||||
progress=percent,
|
||||
current_time=details.get("time", 0.0),
|
||||
status="processing",
|
||||
)
|
||||
|
||||
try:
|
||||
executor = get_executor()
|
||||
success = executor.run(
|
||||
job_id=job_id,
|
||||
source_path=tmp_source,
|
||||
output_path=tmp_output,
|
||||
preset=preset,
|
||||
trim_start=trim_start,
|
||||
trim_end=trim_end,
|
||||
duration=duration,
|
||||
progress_callback=progress_callback,
|
||||
)
|
||||
|
||||
if success:
|
||||
# Upload result to S3
|
||||
logger.info(f"Uploading {output_key} to {BUCKET_OUT}")
|
||||
upload_file(tmp_output, BUCKET_OUT, output_key)
|
||||
|
||||
logger.info(f"Job {job_id} completed successfully")
|
||||
update_job_progress(job_id, progress=100, status="completed")
|
||||
return {
|
||||
"status": "completed",
|
||||
"job_id": job_id,
|
||||
"output_key": output_key,
|
||||
}
|
||||
else:
|
||||
raise RuntimeError("Executor returned False")
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Job {job_id} failed: {e}")
|
||||
update_job_progress(job_id, progress=0, status="failed", error=str(e))
|
||||
|
||||
if self.request.retries < self.max_retries:
|
||||
raise self.retry(exc=e)
|
||||
|
||||
return {
|
||||
"status": "failed",
|
||||
"job_id": job_id,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
finally:
|
||||
# Clean up temp files
|
||||
for f in [tmp_source, tmp_output]:
|
||||
try:
|
||||
os.unlink(f)
|
||||
except OSError:
|
||||
pass
|
||||
Reference in New Issue
Block a user