From a5057ba412b7ee1020be0e7300e2edec50cdce23 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Tue, 3 Feb 2026 13:40:28 -0300 Subject: [PATCH] gRPC and worker --- .env.template | 23 ---- ctrl/.env.template | 29 +++-- ctrl/docker-compose.yml | 24 ++++ grpc/__init__.py | 21 ++++ grpc/client.py | 208 +++++++++++++++++++++++++++++++++ grpc/server.py | 251 ++++++++++++++++++++++++++++++++++++++++ worker/__init__.py | 15 +++ worker/executor.py | 149 ++++++++++++++++++++++++ worker/tasks.py | 96 +++++++++++++++ 9 files changed, 782 insertions(+), 34 deletions(-) delete mode 100644 .env.template create mode 100644 grpc/__init__.py create mode 100644 grpc/client.py create mode 100644 grpc/server.py create mode 100644 worker/__init__.py create mode 100644 worker/executor.py create mode 100644 worker/tasks.py diff --git a/.env.template b/.env.template deleted file mode 100644 index 03bde7b..0000000 --- a/.env.template +++ /dev/null @@ -1,23 +0,0 @@ -# MPR Environment Configuration -# Copy to .env and adjust values as needed - -# Database -POSTGRES_DB=mpr -POSTGRES_USER=mpr_user -POSTGRES_PASSWORD=mpr_pass -POSTGRES_HOST=postgres -POSTGRES_PORT=5432 -DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} - -# Redis -REDIS_HOST=redis -REDIS_PORT=6379 -REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}/0 - -# Django -DEBUG=1 -DJANGO_SETTINGS_MODULE=mpr.settings -SECRET_KEY=change-this-in-production - -# Worker -MPR_EXECUTOR=local diff --git a/ctrl/.env.template b/ctrl/.env.template index b135ada..465b8bf 100644 --- a/ctrl/.env.template +++ b/ctrl/.env.template @@ -1,21 +1,28 @@ -# MPR Control Environment -# Copy to .env and adjust values +# MPR Environment Configuration +# Copy to .env and adjust values as needed # Database POSTGRES_DB=mpr POSTGRES_USER=mpr_user POSTGRES_PASSWORD=mpr_pass +POSTGRES_HOST=postgres +POSTGRES_PORT=5432 +DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} -# Ports (less common to avoid conflicts) -POSTGRES_PORT=5433 -REDIS_PORT=6380 -DJANGO_PORT=8701 -FASTAPI_PORT=8702 -TIMELINE_PORT=5173 +# Redis +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}/0 + +# Django +DEBUG=1 +DJANGO_SETTINGS_MODULE=mpr.settings +SECRET_KEY=change-this-in-production # Worker MPR_EXECUTOR=local -# Remote deployment (optional) -# SERVER=user@host -# REMOTE_PATH=~/mpr +# gRPC +GRPC_HOST=grpc +GRPC_PORT=50051 +GRPC_MAX_WORKERS=10 diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 5d52a7f..f00cb2b 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -3,6 +3,8 @@ x-common-env: &common-env REDIS_URL: redis://redis:6379/0 DJANGO_SETTINGS_MODULE: mpr.settings DEBUG: 1 + GRPC_HOST: grpc + GRPC_PORT: 50051 x-healthcheck-defaults: &healthcheck-defaults interval: 5s @@ -93,6 +95,26 @@ services: redis: condition: service_healthy + grpc: + build: + context: .. + dockerfile: ctrl/Dockerfile + command: python -m grpc.server + ports: + - "50051:50051" + environment: + <<: *common-env + GRPC_PORT: 50051 + GRPC_MAX_WORKERS: 10 + volumes: + - ..:/app + - ../media:/app/media + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + celery: build: context: .. @@ -109,6 +131,8 @@ services: condition: service_healthy redis: condition: service_healthy + grpc: + condition: service_started timeline: build: diff --git a/grpc/__init__.py b/grpc/__init__.py new file mode 100644 index 0000000..c5a6faa --- /dev/null +++ b/grpc/__init__.py @@ -0,0 +1,21 @@ +""" +MPR gRPC Module + +Provides gRPC server and client for worker communication. + +Generated stubs (worker_pb2.py, worker_pb2_grpc.py) are created by: + python schema/generate.py --proto + +Requires: grpcio, grpcio-tools +""" + +from .client import WorkerClient, get_client +from .server import WorkerServicer, serve, update_job_progress + +__all__ = [ + "WorkerClient", + "WorkerServicer", + "get_client", + "serve", + "update_job_progress", +] diff --git a/grpc/client.py b/grpc/client.py new file mode 100644 index 0000000..0e862f2 --- /dev/null +++ b/grpc/client.py @@ -0,0 +1,208 @@ +""" +gRPC Client - Used by FastAPI to communicate with workers. +""" + +import json +import logging +import os +from typing import Callable, Iterator, Optional + +import grpc + +# Generated stubs - run `python schema/generate.py --proto` if missing +try: + from . import worker_pb2, worker_pb2_grpc +except ImportError: + import worker_pb2 + import worker_pb2_grpc + +logger = logging.getLogger(__name__) + +# Configuration from environment +GRPC_HOST = os.environ.get("GRPC_HOST", "grpc") +GRPC_PORT = int(os.environ.get("GRPC_PORT", "50051")) + + +class WorkerClient: + """gRPC client for worker communication.""" + + def __init__(self, host: str = None, port: int = None): + """ + Initialize the client. + + Args: + host: gRPC server host (defaults to GRPC_HOST env var) + port: gRPC server port (defaults to GRPC_PORT env var) + """ + self.host = host or GRPC_HOST + self.port = port or GRPC_PORT + self.address = f"{self.host}:{self.port}" + self._channel: Optional[grpc.Channel] = None + self._stub: Optional[worker_pb2_grpc.WorkerServiceStub] = None + + def _ensure_connected(self) -> worker_pb2_grpc.WorkerServiceStub: + """Ensure channel is connected and return stub.""" + if self._channel is None: + self._channel = grpc.insecure_channel(self.address) + self._stub = worker_pb2_grpc.WorkerServiceStub(self._channel) + return self._stub + + def close(self) -> None: + """Close the channel.""" + if self._channel: + self._channel.close() + self._channel = None + self._stub = None + + def __enter__(self): + self._ensure_connected() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def submit_job( + self, + job_id: str, + source_path: str, + output_path: str, + preset: Optional[dict] = None, + trim_start: Optional[float] = None, + trim_end: Optional[float] = None, + ) -> tuple[bool, str]: + """ + Submit a job to the worker. + + Args: + job_id: Unique job identifier + source_path: Path to source file + output_path: Path for output file + preset: Transcode preset dict (optional) + trim_start: Trim start time in seconds (optional) + trim_end: Trim end time in seconds (optional) + + Returns: + Tuple of (accepted: bool, message: str) + """ + stub = self._ensure_connected() + + request = worker_pb2.JobRequest( + job_id=job_id, + source_path=source_path, + output_path=output_path, + preset_json=json.dumps(preset) if preset else "", + ) + + if trim_start is not None: + request.trim_start = trim_start + if trim_end is not None: + request.trim_end = trim_end + + try: + response = stub.SubmitJob(request) + return response.accepted, response.message + except grpc.RpcError as e: + logger.error(f"SubmitJob RPC failed: {e}") + return False, str(e) + + def stream_progress( + self, + job_id: str, + callback: Optional[Callable[[dict], None]] = None, + ) -> Iterator[dict]: + """ + Stream progress updates for a job. + + Args: + job_id: Job identifier + callback: Optional callback for each update + + Yields: + Progress update dicts + """ + stub = self._ensure_connected() + + request = worker_pb2.ProgressRequest(job_id=job_id) + + try: + for update in stub.StreamProgress(request): + progress = { + "job_id": update.job_id, + "progress": update.progress, + "current_frame": update.current_frame, + "current_time": update.current_time, + "speed": update.speed, + "status": update.status, + "error": update.error if update.HasField("error") else None, + } + + if callback: + callback(progress) + + yield progress + + if update.status in ("completed", "failed", "cancelled"): + break + + except grpc.RpcError as e: + logger.error(f"StreamProgress RPC failed: {e}") + yield { + "job_id": job_id, + "progress": 0, + "status": "error", + "error": str(e), + } + + def cancel_job(self, job_id: str) -> tuple[bool, str]: + """ + Cancel a running job. + + Args: + job_id: Job identifier + + Returns: + Tuple of (cancelled: bool, message: str) + """ + stub = self._ensure_connected() + + request = worker_pb2.CancelRequest(job_id=job_id) + + try: + response = stub.CancelJob(request) + return response.cancelled, response.message + except grpc.RpcError as e: + logger.error(f"CancelJob RPC failed: {e}") + return False, str(e) + + def get_worker_status(self) -> Optional[dict]: + """ + Get worker status and capabilities. + + Returns: + Status dict or None on error + """ + stub = self._ensure_connected() + + try: + response = stub.GetWorkerStatus(worker_pb2.Empty()) + return { + "available": response.available, + "active_jobs": response.active_jobs, + "supported_codecs": list(response.supported_codecs), + "gpu_available": response.gpu_available, + } + except grpc.RpcError as e: + logger.error(f"GetWorkerStatus RPC failed: {e}") + return None + + +# Singleton client instance +_client: Optional[WorkerClient] = None + + +def get_client() -> WorkerClient: + """Get or create the singleton client (uses env vars for config).""" + global _client + if _client is None: + _client = WorkerClient() + return _client diff --git a/grpc/server.py b/grpc/server.py new file mode 100644 index 0000000..9dc9004 --- /dev/null +++ b/grpc/server.py @@ -0,0 +1,251 @@ +""" +gRPC Server - Worker Service Implementation + +Runs in the worker process to handle job submissions and progress streaming. +""" + +import json +import logging +import os +import time +from concurrent import futures +from typing import Iterator + +import grpc + +# Configuration from environment +GRPC_PORT = int(os.environ.get("GRPC_PORT", "50051")) +GRPC_MAX_WORKERS = int(os.environ.get("GRPC_MAX_WORKERS", "10")) + +# Generated stubs - run `python schema/generate.py --proto` if missing +try: + from . import worker_pb2, worker_pb2_grpc +except ImportError: + import worker_pb2 + import worker_pb2_grpc + +logger = logging.getLogger(__name__) + +# Active jobs progress tracking (shared state for streaming) +_active_jobs: dict[str, dict] = {} + + +class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer): + """gRPC service implementation for worker operations.""" + + def __init__(self, celery_app=None): + """ + Initialize the servicer. + + Args: + celery_app: Optional Celery app for task dispatch + """ + self.celery_app = celery_app + + def SubmitJob(self, request, context): + """Submit a transcode/trim job to the worker.""" + job_id = request.job_id + logger.info(f"SubmitJob: {job_id}") + + try: + # Parse preset + preset = json.loads(request.preset_json) if request.preset_json else None + + # Initialize progress tracking + _active_jobs[job_id] = { + "status": "pending", + "progress": 0, + "current_frame": 0, + "current_time": 0.0, + "speed": 0.0, + "error": None, + } + + # Dispatch to Celery if available + if self.celery_app: + from worker.tasks import run_transcode_job + + task = run_transcode_job.delay( + job_id=job_id, + source_path=request.source_path, + output_path=request.output_path, + preset=preset, + trim_start=request.trim_start + if request.HasField("trim_start") + else None, + trim_end=request.trim_end if request.HasField("trim_end") else None, + ) + _active_jobs[job_id]["celery_task_id"] = task.id + + return worker_pb2.JobResponse( + job_id=job_id, + accepted=True, + message="Job submitted", + ) + + except Exception as e: + logger.exception(f"SubmitJob failed: {e}") + return worker_pb2.JobResponse( + job_id=job_id, + accepted=False, + message=str(e), + ) + + def StreamProgress(self, request, context) -> Iterator[worker_pb2.ProgressUpdate]: + """Stream progress updates for a job.""" + job_id = request.job_id + logger.info(f"StreamProgress: {job_id}") + + # Check if job exists + if job_id not in _active_jobs: + yield worker_pb2.ProgressUpdate( + job_id=job_id, + progress=0, + status="not_found", + error="Job not found", + ) + return + + # Stream updates until job completes + last_progress = -1 + while True: + if context.cancelled(): + logger.info(f"StreamProgress cancelled: {job_id}") + break + + job_state = _active_jobs.get(job_id) + if not job_state: + break + + # Only yield if progress changed + if job_state["progress"] != last_progress: + last_progress = job_state["progress"] + + yield worker_pb2.ProgressUpdate( + job_id=job_id, + progress=job_state["progress"], + current_frame=job_state.get("current_frame", 0), + current_time=job_state.get("current_time", 0.0), + speed=job_state.get("speed", 0.0), + status=job_state["status"], + error=job_state.get("error"), + ) + + # Exit if job is done + if job_state["status"] in ("completed", "failed", "cancelled"): + break + + # Small delay to avoid busy loop + time.sleep(0.1) + + # Cleanup completed jobs + if job_id in _active_jobs: + status = _active_jobs[job_id].get("status") + if status in ("completed", "failed", "cancelled"): + _active_jobs.pop(job_id, None) + + def CancelJob(self, request, context): + """Cancel a running job.""" + job_id = request.job_id + logger.info(f"CancelJob: {job_id}") + + if job_id in _active_jobs: + _active_jobs[job_id]["status"] = "cancelled" + + # Revoke Celery task if available + if self.celery_app: + task_id = _active_jobs[job_id].get("celery_task_id") + if task_id: + self.celery_app.control.revoke(task_id, terminate=True) + + return worker_pb2.CancelResponse( + job_id=job_id, + cancelled=True, + message="Job cancelled", + ) + + return worker_pb2.CancelResponse( + job_id=job_id, + cancelled=False, + message="Job not found", + ) + + def GetWorkerStatus(self, request, context): + """Get worker health and capabilities.""" + try: + from core.ffmpeg import get_encoders + + encoders = get_encoders() + codec_names = [e["name"] for e in encoders.get("video", [])] + except Exception: + codec_names = [] + + # Check for GPU encoders + gpu_available = any( + "nvenc" in name or "vaapi" in name or "qsv" in name for name in codec_names + ) + + return worker_pb2.WorkerStatus( + available=True, + active_jobs=len(_active_jobs), + supported_codecs=codec_names[:20], # Limit to 20 + gpu_available=gpu_available, + ) + + +def update_job_progress( + job_id: str, + progress: int, + current_frame: int = 0, + current_time: float = 0.0, + speed: float = 0.0, + status: str = "processing", + error: str = None, +) -> None: + """ + Update job progress (called from worker tasks). + + This updates the in-memory state that StreamProgress reads from. + """ + if job_id in _active_jobs: + _active_jobs[job_id].update( + { + "progress": progress, + "current_frame": current_frame, + "current_time": current_time, + "speed": speed, + "status": status, + "error": error, + } + ) + + +def serve(port: int = None, celery_app=None) -> grpc.Server: + """ + Start the gRPC server. + + Args: + port: Port to listen on (defaults to GRPC_PORT env var) + celery_app: Optional Celery app for task dispatch + + Returns: + The running gRPC server + """ + if port is None: + port = GRPC_PORT + + server = grpc.server(futures.ThreadPoolExecutor(max_workers=GRPC_MAX_WORKERS)) + worker_pb2_grpc.add_WorkerServiceServicer_to_server( + WorkerServicer(celery_app=celery_app), + server, + ) + server.add_insecure_port(f"[::]:{port}") + server.start() + logger.info(f"gRPC server started on port {port}") + return server + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + server = serve() + server.wait_for_termination() diff --git a/worker/__init__.py b/worker/__init__.py new file mode 100644 index 0000000..fea2bad --- /dev/null +++ b/worker/__init__.py @@ -0,0 +1,15 @@ +""" +MPR Worker Module + +Provides executor abstraction and Celery tasks for job processing. +""" + +from .executor import Executor, LocalExecutor, get_executor +from .tasks import run_transcode_job + +__all__ = [ + "Executor", + "LocalExecutor", + "get_executor", + "run_transcode_job", +] diff --git a/worker/executor.py b/worker/executor.py new file mode 100644 index 0000000..9096998 --- /dev/null +++ b/worker/executor.py @@ -0,0 +1,149 @@ +""" +Executor abstraction for job processing. + +Supports different backends: +- LocalExecutor: FFmpeg via Celery (default) +- LambdaExecutor: AWS Lambda (future) +""" + +import os +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, Optional + +from core.ffmpeg.transcode import TranscodeConfig, transcode + +# Configuration from environment +MPR_EXECUTOR = os.environ.get("MPR_EXECUTOR", "local") + + +class Executor(ABC): + """Abstract base class for job executors.""" + + @abstractmethod + def run( + self, + job_id: str, + source_path: str, + output_path: str, + preset: Optional[Dict[str, Any]] = None, + trim_start: Optional[float] = None, + trim_end: Optional[float] = None, + duration: Optional[float] = None, + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> bool: + """ + Execute a transcode/trim job. + + Args: + job_id: Unique job identifier + source_path: Path to source file + output_path: Path for output file + preset: Transcode preset dict (optional, None = trim only) + trim_start: Trim start time in seconds (optional) + trim_end: Trim end time in seconds (optional) + duration: Source duration in seconds (for progress calculation) + progress_callback: Called with (percent, details_dict) + + Returns: + True if successful + """ + pass + + +class LocalExecutor(Executor): + """Execute jobs locally using FFmpeg.""" + + def run( + self, + job_id: str, + source_path: str, + output_path: str, + preset: Optional[Dict[str, Any]] = None, + trim_start: Optional[float] = None, + trim_end: Optional[float] = None, + duration: Optional[float] = None, + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> bool: + """Execute job using local FFmpeg.""" + + # Build config from preset or use stream copy for trim-only + if preset: + config = TranscodeConfig( + input_path=source_path, + output_path=output_path, + video_codec=preset.get("video_codec", "libx264"), + video_bitrate=preset.get("video_bitrate"), + video_crf=preset.get("video_crf"), + video_preset=preset.get("video_preset"), + resolution=preset.get("resolution"), + framerate=preset.get("framerate"), + audio_codec=preset.get("audio_codec", "aac"), + audio_bitrate=preset.get("audio_bitrate"), + audio_channels=preset.get("audio_channels"), + audio_samplerate=preset.get("audio_samplerate"), + container=preset.get("container", "mp4"), + extra_args=preset.get("extra_args", []), + trim_start=trim_start, + trim_end=trim_end, + ) + else: + # Trim-only: stream copy + config = TranscodeConfig( + input_path=source_path, + output_path=output_path, + video_codec="copy", + audio_codec="copy", + trim_start=trim_start, + trim_end=trim_end, + ) + + # Wrapper to convert float percent to int + def wrapped_callback(percent: float, details: Dict[str, Any]) -> None: + if progress_callback: + progress_callback(int(percent), details) + + return transcode( + config, + duration=duration, + progress_callback=wrapped_callback if progress_callback else None, + ) + + +class LambdaExecutor(Executor): + """Execute jobs via AWS Lambda (future implementation).""" + + def run( + self, + job_id: str, + source_path: str, + output_path: str, + preset: Optional[Dict[str, Any]] = None, + trim_start: Optional[float] = None, + trim_end: Optional[float] = None, + duration: Optional[float] = None, + progress_callback: Optional[Callable[[int, Dict[str, Any]], None]] = None, + ) -> bool: + """Execute job via AWS Lambda.""" + raise NotImplementedError("LambdaExecutor not yet implemented") + + +# Executor registry +_executors: Dict[str, type] = { + "local": LocalExecutor, + "lambda": LambdaExecutor, +} + +_executor_instance: Optional[Executor] = None + + +def get_executor() -> Executor: + """Get the configured executor instance.""" + global _executor_instance + + if _executor_instance is None: + executor_type = MPR_EXECUTOR.lower() + if executor_type not in _executors: + raise ValueError(f"Unknown executor type: {executor_type}") + _executor_instance = _executors[executor_type]() + + return _executor_instance diff --git a/worker/tasks.py b/worker/tasks.py new file mode 100644 index 0000000..6a37e09 --- /dev/null +++ b/worker/tasks.py @@ -0,0 +1,96 @@ +""" +Celery tasks for job processing. +""" + +import logging +import os +from typing import Any, Dict, Optional + +from celery import shared_task + +from grpc.server import update_job_progress +from worker.executor import get_executor + +logger = logging.getLogger(__name__) + +# Media paths from environment +MEDIA_ROOT = os.environ.get("MEDIA_ROOT", "/app/media") + + +@shared_task(bind=True, max_retries=3, default_retry_delay=60) +def run_transcode_job( + self, + job_id: str, + source_path: str, + output_path: str, + preset: Optional[Dict[str, Any]] = None, + trim_start: Optional[float] = None, + trim_end: Optional[float] = None, + duration: Optional[float] = None, +) -> Dict[str, Any]: + """ + Celery task to run a transcode/trim job. + + Args: + job_id: Unique job identifier + source_path: Path to source file + output_path: Path for output file + preset: Transcode preset dict (optional) + trim_start: Trim start time in seconds (optional) + trim_end: Trim end time in seconds (optional) + duration: Source duration for progress calculation + + Returns: + Result dict with status and output_path + """ + logger.info(f"Starting job {job_id}: {source_path} -> {output_path}") + + # Update status to processing + update_job_progress(job_id, progress=0, status="processing") + + def progress_callback(percent: int, details: Dict[str, Any]) -> None: + """Update gRPC progress state.""" + update_job_progress( + job_id, + progress=percent, + current_time=details.get("time", 0.0), + status="processing", + ) + + try: + executor = get_executor() + success = executor.run( + job_id=job_id, + source_path=source_path, + output_path=output_path, + preset=preset, + trim_start=trim_start, + trim_end=trim_end, + duration=duration, + progress_callback=progress_callback, + ) + + if success: + logger.info(f"Job {job_id} completed successfully") + update_job_progress(job_id, progress=100, status="completed") + return { + "status": "completed", + "job_id": job_id, + "output_path": output_path, + } + else: + raise RuntimeError("Executor returned False") + + except Exception as e: + logger.exception(f"Job {job_id} failed: {e}") + update_job_progress(job_id, progress=0, status="failed", error=str(e)) + + # Retry on transient errors + if self.request.retries < self.max_retries: + raise self.retry(exc=e) + + return { + "status": "failed", + "job_id": job_id, + "error": str(e), + }