gRPC and worker

This commit is contained in:
2026-02-03 13:40:28 -03:00
parent 67573713bd
commit a5057ba412
9 changed files with 782 additions and 34 deletions

View File

@@ -1,23 +0,0 @@
# MPR Environment Configuration
# Copy to .env and adjust values as needed
# Database
POSTGRES_DB=mpr
POSTGRES_USER=mpr_user
POSTGRES_PASSWORD=mpr_pass
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
# Redis
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}/0
# Django
DEBUG=1
DJANGO_SETTINGS_MODULE=mpr.settings
SECRET_KEY=change-this-in-production
# Worker
MPR_EXECUTOR=local

View File

@@ -1,21 +1,28 @@
# MPR Control Environment
# Copy to .env and adjust values
# MPR Environment Configuration
# Copy to .env and adjust values as needed
# Database
POSTGRES_DB=mpr
POSTGRES_USER=mpr_user
POSTGRES_PASSWORD=mpr_pass
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
# Ports (less common to avoid conflicts)
POSTGRES_PORT=5433
REDIS_PORT=6380
DJANGO_PORT=8701
FASTAPI_PORT=8702
TIMELINE_PORT=5173
# Redis
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}/0
# Django
DEBUG=1
DJANGO_SETTINGS_MODULE=mpr.settings
SECRET_KEY=change-this-in-production
# Worker
MPR_EXECUTOR=local
# Remote deployment (optional)
# SERVER=user@host
# REMOTE_PATH=~/mpr
# gRPC
GRPC_HOST=grpc
GRPC_PORT=50051
GRPC_MAX_WORKERS=10

View File

@@ -3,6 +3,8 @@ x-common-env: &common-env
REDIS_URL: redis://redis:6379/0
DJANGO_SETTINGS_MODULE: mpr.settings
DEBUG: 1
GRPC_HOST: grpc
GRPC_PORT: 50051
x-healthcheck-defaults: &healthcheck-defaults
interval: 5s
@@ -93,6 +95,26 @@ services:
redis:
condition: service_healthy
grpc:
build:
context: ..
dockerfile: ctrl/Dockerfile
command: python -m grpc.server
ports:
- "50051:50051"
environment:
<<: *common-env
GRPC_PORT: 50051
GRPC_MAX_WORKERS: 10
volumes:
- ..:/app
- ../media:/app/media
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
celery:
build:
context: ..
@@ -109,6 +131,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
grpc:
condition: service_started
timeline:
build:

21
grpc/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
"""
MPR gRPC Module
Provides gRPC server and client for worker communication.
Generated stubs (worker_pb2.py, worker_pb2_grpc.py) are created by:
python schema/generate.py --proto
Requires: grpcio, grpcio-tools
"""
from .client import WorkerClient, get_client
from .server import WorkerServicer, serve, update_job_progress
__all__ = [
"WorkerClient",
"WorkerServicer",
"get_client",
"serve",
"update_job_progress",
]

208
grpc/client.py Normal file
View File

@@ -0,0 +1,208 @@
"""
gRPC Client - Used by FastAPI to communicate with workers.
"""
import json
import logging
import os
from typing import Callable, Iterator, Optional
import grpc
# Generated stubs - run `python schema/generate.py --proto` if missing
try:
from . import worker_pb2, worker_pb2_grpc
except ImportError:
import worker_pb2
import worker_pb2_grpc
logger = logging.getLogger(__name__)
# Configuration from environment
GRPC_HOST = os.environ.get("GRPC_HOST", "grpc")
GRPC_PORT = int(os.environ.get("GRPC_PORT", "50051"))
class WorkerClient:
"""gRPC client for worker communication."""
def __init__(self, host: str = None, port: int = None):
"""
Initialize the client.
Args:
host: gRPC server host (defaults to GRPC_HOST env var)
port: gRPC server port (defaults to GRPC_PORT env var)
"""
self.host = host or GRPC_HOST
self.port = port or GRPC_PORT
self.address = f"{self.host}:{self.port}"
self._channel: Optional[grpc.Channel] = None
self._stub: Optional[worker_pb2_grpc.WorkerServiceStub] = None
def _ensure_connected(self) -> worker_pb2_grpc.WorkerServiceStub:
"""Ensure channel is connected and return stub."""
if self._channel is None:
self._channel = grpc.insecure_channel(self.address)
self._stub = worker_pb2_grpc.WorkerServiceStub(self._channel)
return self._stub
def close(self) -> None:
"""Close the channel."""
if self._channel:
self._channel.close()
self._channel = None
self._stub = None
def __enter__(self):
self._ensure_connected()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def submit_job(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[dict] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
) -> tuple[bool, str]:
"""
Submit a job to the worker.
Args:
job_id: Unique job identifier
source_path: Path to source file
output_path: Path for output file
preset: Transcode preset dict (optional)
trim_start: Trim start time in seconds (optional)
trim_end: Trim end time in seconds (optional)
Returns:
Tuple of (accepted: bool, message: str)
"""
stub = self._ensure_connected()
request = worker_pb2.JobRequest(
job_id=job_id,
source_path=source_path,
output_path=output_path,
preset_json=json.dumps(preset) if preset else "",
)
if trim_start is not None:
request.trim_start = trim_start
if trim_end is not None:
request.trim_end = trim_end
try:
response = stub.SubmitJob(request)
return response.accepted, response.message
except grpc.RpcError as e:
logger.error(f"SubmitJob RPC failed: {e}")
return False, str(e)
def stream_progress(
self,
job_id: str,
callback: Optional[Callable[[dict], None]] = None,
) -> Iterator[dict]:
"""
Stream progress updates for a job.
Args:
job_id: Job identifier
callback: Optional callback for each update
Yields:
Progress update dicts
"""
stub = self._ensure_connected()
request = worker_pb2.ProgressRequest(job_id=job_id)
try:
for update in stub.StreamProgress(request):
progress = {
"job_id": update.job_id,
"progress": update.progress,
"current_frame": update.current_frame,
"current_time": update.current_time,
"speed": update.speed,
"status": update.status,
"error": update.error if update.HasField("error") else None,
}
if callback:
callback(progress)
yield progress
if update.status in ("completed", "failed", "cancelled"):
break
except grpc.RpcError as e:
logger.error(f"StreamProgress RPC failed: {e}")
yield {
"job_id": job_id,
"progress": 0,
"status": "error",
"error": str(e),
}
def cancel_job(self, job_id: str) -> tuple[bool, str]:
"""
Cancel a running job.
Args:
job_id: Job identifier
Returns:
Tuple of (cancelled: bool, message: str)
"""
stub = self._ensure_connected()
request = worker_pb2.CancelRequest(job_id=job_id)
try:
response = stub.CancelJob(request)
return response.cancelled, response.message
except grpc.RpcError as e:
logger.error(f"CancelJob RPC failed: {e}")
return False, str(e)
def get_worker_status(self) -> Optional[dict]:
"""
Get worker status and capabilities.
Returns:
Status dict or None on error
"""
stub = self._ensure_connected()
try:
response = stub.GetWorkerStatus(worker_pb2.Empty())
return {
"available": response.available,
"active_jobs": response.active_jobs,
"supported_codecs": list(response.supported_codecs),
"gpu_available": response.gpu_available,
}
except grpc.RpcError as e:
logger.error(f"GetWorkerStatus RPC failed: {e}")
return None
# Singleton client instance
_client: Optional[WorkerClient] = None
def get_client() -> WorkerClient:
"""Get or create the singleton client (uses env vars for config)."""
global _client
if _client is None:
_client = WorkerClient()
return _client

251
grpc/server.py Normal file
View File

@@ -0,0 +1,251 @@
"""
gRPC Server - Worker Service Implementation
Runs in the worker process to handle job submissions and progress streaming.
"""
import json
import logging
import os
import time
from concurrent import futures
from typing import Iterator
import grpc
# Configuration from environment
GRPC_PORT = int(os.environ.get("GRPC_PORT", "50051"))
GRPC_MAX_WORKERS = int(os.environ.get("GRPC_MAX_WORKERS", "10"))
# Generated stubs - run `python schema/generate.py --proto` if missing
try:
from . import worker_pb2, worker_pb2_grpc
except ImportError:
import worker_pb2
import worker_pb2_grpc
logger = logging.getLogger(__name__)
# Active jobs progress tracking (shared state for streaming)
_active_jobs: dict[str, dict] = {}
class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer):
"""gRPC service implementation for worker operations."""
def __init__(self, celery_app=None):
"""
Initialize the servicer.
Args:
celery_app: Optional Celery app for task dispatch
"""
self.celery_app = celery_app
def SubmitJob(self, request, context):
"""Submit a transcode/trim job to the worker."""
job_id = request.job_id
logger.info(f"SubmitJob: {job_id}")
try:
# Parse preset
preset = json.loads(request.preset_json) if request.preset_json else None
# Initialize progress tracking
_active_jobs[job_id] = {
"status": "pending",
"progress": 0,
"current_frame": 0,
"current_time": 0.0,
"speed": 0.0,
"error": None,
}
# Dispatch to Celery if available
if self.celery_app:
from worker.tasks import run_transcode_job
task = run_transcode_job.delay(
job_id=job_id,
source_path=request.source_path,
output_path=request.output_path,
preset=preset,
trim_start=request.trim_start
if request.HasField("trim_start")
else None,
trim_end=request.trim_end if request.HasField("trim_end") else None,
)
_active_jobs[job_id]["celery_task_id"] = task.id
return worker_pb2.JobResponse(
job_id=job_id,
accepted=True,
message="Job submitted",
)
except Exception as e:
logger.exception(f"SubmitJob failed: {e}")
return worker_pb2.JobResponse(
job_id=job_id,
accepted=False,
message=str(e),
)
def StreamProgress(self, request, context) -> Iterator[worker_pb2.ProgressUpdate]:
"""Stream progress updates for a job."""
job_id = request.job_id
logger.info(f"StreamProgress: {job_id}")
# Check if job exists
if job_id not in _active_jobs:
yield worker_pb2.ProgressUpdate(
job_id=job_id,
progress=0,
status="not_found",
error="Job not found",
)
return
# Stream updates until job completes
last_progress = -1
while True:
if context.cancelled():
logger.info(f"StreamProgress cancelled: {job_id}")
break
job_state = _active_jobs.get(job_id)
if not job_state:
break
# Only yield if progress changed
if job_state["progress"] != last_progress:
last_progress = job_state["progress"]
yield worker_pb2.ProgressUpdate(
job_id=job_id,
progress=job_state["progress"],
current_frame=job_state.get("current_frame", 0),
current_time=job_state.get("current_time", 0.0),
speed=job_state.get("speed", 0.0),
status=job_state["status"],
error=job_state.get("error"),
)
# Exit if job is done
if job_state["status"] in ("completed", "failed", "cancelled"):
break
# Small delay to avoid busy loop
time.sleep(0.1)
# Cleanup completed jobs
if job_id in _active_jobs:
status = _active_jobs[job_id].get("status")
if status in ("completed", "failed", "cancelled"):
_active_jobs.pop(job_id, None)
def CancelJob(self, request, context):
"""Cancel a running job."""
job_id = request.job_id
logger.info(f"CancelJob: {job_id}")
if job_id in _active_jobs:
_active_jobs[job_id]["status"] = "cancelled"
# Revoke Celery task if available
if self.celery_app:
task_id = _active_jobs[job_id].get("celery_task_id")
if task_id:
self.celery_app.control.revoke(task_id, terminate=True)
return worker_pb2.CancelResponse(
job_id=job_id,
cancelled=True,
message="Job cancelled",
)
return worker_pb2.CancelResponse(
job_id=job_id,
cancelled=False,
message="Job not found",
)
def GetWorkerStatus(self, request, context):
"""Get worker health and capabilities."""
try:
from core.ffmpeg import get_encoders
encoders = get_encoders()
codec_names = [e["name"] for e in encoders.get("video", [])]
except Exception:
codec_names = []
# Check for GPU encoders
gpu_available = any(
"nvenc" in name or "vaapi" in name or "qsv" in name for name in codec_names
)
return worker_pb2.WorkerStatus(
available=True,
active_jobs=len(_active_jobs),
supported_codecs=codec_names[:20], # Limit to 20
gpu_available=gpu_available,
)
def update_job_progress(
job_id: str,
progress: int,
current_frame: int = 0,
current_time: float = 0.0,
speed: float = 0.0,
status: str = "processing",
error: str = None,
) -> None:
"""
Update job progress (called from worker tasks).
This updates the in-memory state that StreamProgress reads from.
"""
if job_id in _active_jobs:
_active_jobs[job_id].update(
{
"progress": progress,
"current_frame": current_frame,
"current_time": current_time,
"speed": speed,
"status": status,
"error": error,
}
)
def serve(port: int = None, celery_app=None) -> grpc.Server:
"""
Start the gRPC server.
Args:
port: Port to listen on (defaults to GRPC_PORT env var)
celery_app: Optional Celery app for task dispatch
Returns:
The running gRPC server
"""
if port is None:
port = GRPC_PORT
server = grpc.server(futures.ThreadPoolExecutor(max_workers=GRPC_MAX_WORKERS))
worker_pb2_grpc.add_WorkerServiceServicer_to_server(
WorkerServicer(celery_app=celery_app),
server,
)
server.add_insecure_port(f"[::]:{port}")
server.start()
logger.info(f"gRPC server started on port {port}")
return server
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
server = serve()
server.wait_for_termination()

15
worker/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
"""
MPR Worker Module
Provides executor abstraction and Celery tasks for job processing.
"""
from .executor import Executor, LocalExecutor, get_executor
from .tasks import run_transcode_job
__all__ = [
"Executor",
"LocalExecutor",
"get_executor",
"run_transcode_job",
]

149
worker/executor.py Normal file
View File

@@ -0,0 +1,149 @@
"""
Executor abstraction for job processing.
Supports different backends:
- LocalExecutor: FFmpeg via Celery (default)
- LambdaExecutor: AWS Lambda (future)
"""
import os
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Optional
from core.ffmpeg.transcode import TranscodeConfig, transcode
# Configuration from environment
MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local")
class Executor(ABC):
"""Abstract base class for job executors."""
@abstractmethod
def run(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
) -> bool:
"""
Execute a transcode/trim job.
Args:
job_id: Unique job identifier
source_path: Path to source file
output_path: Path for output file
preset: Transcode preset dict (optional, None = trim only)
trim_start: Trim start time in seconds (optional)
trim_end: Trim end time in seconds (optional)
duration: Source duration in seconds (for progress calculation)
progress_callback: Called with (percent, details_dict)
Returns:
True if successful
"""
pass
class LocalExecutor(Executor):
"""Execute jobs locally using FFmpeg."""
def run(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
) -> bool:
"""Execute job using local FFmpeg."""
# Build config from preset or use stream copy for trim-only
if preset:
config = TranscodeConfig(
input_path=source_path,
output_path=output_path,
video_codec=preset.get("video_codec", "libx264"),
video_bitrate=preset.get("video_bitrate"),
video_crf=preset.get("video_crf"),
video_preset=preset.get("video_preset"),
resolution=preset.get("resolution"),
framerate=preset.get("framerate"),
audio_codec=preset.get("audio_codec", "aac"),
audio_bitrate=preset.get("audio_bitrate"),
audio_channels=preset.get("audio_channels"),
audio_samplerate=preset.get("audio_samplerate"),
container=preset.get("container", "mp4"),
extra_args=preset.get("extra_args", []),
trim_start=trim_start,
trim_end=trim_end,
)
else:
# Trim-only: stream copy
config = TranscodeConfig(
input_path=source_path,
output_path=output_path,
video_codec="copy",
audio_codec="copy",
trim_start=trim_start,
trim_end=trim_end,
)
# Wrapper to convert float percent to int
def wrapped_callback(percent: float, details: Dict[str, Any]) -> None:
if progress_callback:
progress_callback(int(percent), details)
return transcode(
config,
duration=duration,
progress_callback=wrapped_callback if progress_callback else None,
)
class LambdaExecutor(Executor):
"""Execute jobs via AWS Lambda (future implementation)."""
def run(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None,
) -> bool:
"""Execute job via AWS Lambda."""
raise NotImplementedError("LambdaExecutor not yet implemented")
# Executor registry
_executors: Dict[str, type] = {
"local": LocalExecutor,
"lambda": LambdaExecutor,
}
_executor_instance: Optional[Executor] = None
def get_executor() -> Executor:
"""Get the configured executor instance."""
global _executor_instance
if _executor_instance is None:
executor_type = MPR_EXECUTOR.lower()
if executor_type not in _executors:
raise ValueError(f"Unknown executor type: {executor_type}")
_executor_instance = _executors[executor_type]()
return _executor_instance

96
worker/tasks.py Normal file
View File

@@ -0,0 +1,96 @@
"""
Celery tasks for job processing.
"""
import logging
import os
from typing import Any, Dict, Optional
from celery import shared_task
from grpc.server import update_job_progress
from worker.executor import get_executor
logger = logging.getLogger(__name__)
# Media paths from environment
MEDIA_ROOT = os.environ.get("MEDIA_ROOT", "/app/media")
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def run_transcode_job(
self,
job_id: str,
source_path: str,
output_path: str,
preset: Optional[Dict[str, Any]] = None,
trim_start: Optional[float] = None,
trim_end: Optional[float] = None,
duration: Optional[float] = None,
) -> Dict[str, Any]:
"""
Celery task to run a transcode/trim job.
Args:
job_id: Unique job identifier
source_path: Path to source file
output_path: Path for output file
preset: Transcode preset dict (optional)
trim_start: Trim start time in seconds (optional)
trim_end: Trim end time in seconds (optional)
duration: Source duration for progress calculation
Returns:
Result dict with status and output_path
"""
logger.info(f"Starting job {job_id}: {source_path} -> {output_path}")
# Update status to processing
update_job_progress(job_id, progress=0, status="processing")
def progress_callback(percent: int, details: Dict[str, Any]) -> None:
"""Update gRPC progress state."""
update_job_progress(
job_id,
progress=percent,
current_time=details.get("time", 0.0),
status="processing",
)
try:
executor = get_executor()
success = executor.run(
job_id=job_id,
source_path=source_path,
output_path=output_path,
preset=preset,
trim_start=trim_start,
trim_end=trim_end,
duration=duration,
progress_callback=progress_callback,
)
if success:
logger.info(f"Job {job_id} completed successfully")
update_job_progress(job_id, progress=100, status="completed")
return {
"status": "completed",
"job_id": job_id,
"output_path": output_path,
}
else:
raise RuntimeError("Executor returned False")
except Exception as e:
logger.exception(f"Job {job_id} failed: {e}")
update_job_progress(job_id, progress=0, status="failed", error=str(e))
# Retry on transient errors
if self.request.retries < self.max_retries:
raise self.retry(exc=e)
return {
"status": "failed",
"job_id": job_id,
"error": str(e),
}