From b88f75fce0d6afb5a1118dd0ec6373985d27d4b3 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Tue, 3 Feb 2026 14:40:12 -0300 Subject: [PATCH] better module naming --- ctrl/docker-compose.yml | 2 +- grpc/__init__.py | 21 --- mpr/grpc/__init__.py | 10 + {grpc => mpr/grpc}/client.py | 6 +- {grpc => mpr/grpc}/protos/worker.proto | 0 {grpc => mpr/grpc}/server.py | 6 +- mpr/grpc/worker_pb2.py | 52 +++++ mpr/grpc/worker_pb2_grpc.py | 250 +++++++++++++++++++++++++ mpr/media_assets/admin.py | 19 +- requirements.txt | 4 + worker/tasks.py | 2 +- 11 files changed, 331 insertions(+), 41 deletions(-) delete mode 100644 grpc/__init__.py create mode 100644 mpr/grpc/__init__.py rename {grpc => mpr/grpc}/client.py (98%) rename {grpc => mpr/grpc}/protos/worker.proto (100%) rename {grpc => mpr/grpc}/server.py (98%) create mode 100644 mpr/grpc/worker_pb2.py create mode 100644 mpr/grpc/worker_pb2_grpc.py diff --git a/ctrl/docker-compose.yml b/ctrl/docker-compose.yml index 4d5b4ea..0df9369 100644 --- a/ctrl/docker-compose.yml +++ b/ctrl/docker-compose.yml @@ -99,7 +99,7 @@ services: build: context: .. dockerfile: ctrl/Dockerfile - command: python -m grpc.server + command: python -m mpr.grpc.server ports: - "50051:50051" environment: diff --git a/grpc/__init__.py b/grpc/__init__.py deleted file mode 100644 index c5a6faa..0000000 --- a/grpc/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -MPR gRPC Module - -Provides gRPC server and client for worker communication. - -Generated stubs (worker_pb2.py, worker_pb2_grpc.py) are created by: - python schema/generate.py --proto - -Requires: grpcio, grpcio-tools -""" - -from .client import WorkerClient, get_client -from .server import WorkerServicer, serve, update_job_progress - -__all__ = [ - "WorkerClient", - "WorkerServicer", - "get_client", - "serve", - "update_job_progress", -] diff --git a/mpr/grpc/__init__.py b/mpr/grpc/__init__.py new file mode 100644 index 0000000..ae40a85 --- /dev/null +++ b/mpr/grpc/__init__.py @@ -0,0 +1,10 @@ +""" +MPR gRPC Module + +Provides gRPC server and client for worker communication. + +Generated stubs (worker_pb2.py, worker_pb2_grpc.py) are created by: + python schema/generate.py --proto + +Requires: grpcio, grpcio-tools +""" diff --git a/grpc/client.py b/mpr/grpc/client.py similarity index 98% rename from grpc/client.py rename to mpr/grpc/client.py index 0e862f2..98634f0 100644 --- a/grpc/client.py +++ b/mpr/grpc/client.py @@ -10,11 +10,7 @@ from typing import Callable, Iterator, Optional import grpc # Generated stubs - run `python schema/generate.py --proto` if missing -try: - from . import worker_pb2, worker_pb2_grpc -except ImportError: - import worker_pb2 - import worker_pb2_grpc +from . import worker_pb2, worker_pb2_grpc logger = logging.getLogger(__name__) diff --git a/grpc/protos/worker.proto b/mpr/grpc/protos/worker.proto similarity index 100% rename from grpc/protos/worker.proto rename to mpr/grpc/protos/worker.proto diff --git a/grpc/server.py b/mpr/grpc/server.py similarity index 98% rename from grpc/server.py rename to mpr/grpc/server.py index 9dc9004..a994fcd 100644 --- a/grpc/server.py +++ b/mpr/grpc/server.py @@ -18,11 +18,7 @@ GRPC_PORT = int(os.environ.get("GRPC_PORT", "50051")) GRPC_MAX_WORKERS = int(os.environ.get("GRPC_MAX_WORKERS", "10")) # Generated stubs - run `python schema/generate.py --proto` if missing -try: - from . import worker_pb2, worker_pb2_grpc -except ImportError: - import worker_pb2 - import worker_pb2_grpc +from . import worker_pb2, worker_pb2_grpc logger = logging.getLogger(__name__) diff --git a/mpr/grpc/worker_pb2.py b/mpr/grpc/worker_pb2.py new file mode 100644 index 0000000..80e125c --- /dev/null +++ b/mpr/grpc/worker_pb2.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: worker.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'worker.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cworker.proto\x12\nmpr.worker\"\xa7\x01\n\nJobRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x13\n\x0bsource_path\x18\x02 \x01(\t\x12\x13\n\x0boutput_path\x18\x03 \x01(\t\x12\x13\n\x0bpreset_json\x18\x04 \x01(\t\x12\x17\n\ntrim_start\x18\x05 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08trim_end\x18\x06 \x01(\x02H\x01\x88\x01\x01\x42\r\n\x0b_trim_startB\x0b\n\t_trim_end\"@\n\x0bJobResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08\x61\x63\x63\x65pted\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"!\n\x0fProgressRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\x9c\x01\n\x0eProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08progress\x18\x02 \x01(\x05\x12\x15\n\rcurrent_frame\x18\x03 \x01(\x05\x12\x14\n\x0c\x63urrent_time\x18\x04 \x01(\x02\x12\r\n\x05speed\x18\x05 \x01(\x02\x12\x0e\n\x06status\x18\x06 \x01(\t\x12\x12\n\x05\x65rror\x18\x07 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x1f\n\rCancelRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"D\n\x0e\x43\x61ncelResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x11\n\tcancelled\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"g\n\x0cWorkerStatus\x12\x11\n\tavailable\x18\x01 \x01(\x08\x12\x13\n\x0b\x61\x63tive_jobs\x18\x02 \x01(\x05\x12\x18\n\x10supported_codecs\x18\x03 \x03(\t\x12\x15\n\rgpu_available\x18\x04 \x01(\x08\"\x07\n\x05\x45mpty2\x9e\x02\n\rWorkerService\x12<\n\tSubmitJob\x12\x16.mpr.worker.JobRequest\x1a\x17.mpr.worker.JobResponse\x12K\n\x0eStreamProgress\x12\x1b.mpr.worker.ProgressRequest\x1a\x1a.mpr.worker.ProgressUpdate0\x01\x12\x42\n\tCancelJob\x12\x19.mpr.worker.CancelRequest\x1a\x1a.mpr.worker.CancelResponse\x12>\n\x0fGetWorkerStatus\x12\x11.mpr.worker.Empty\x1a\x18.mpr.worker.WorkerStatusb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'worker_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_JOBREQUEST']._serialized_start=29 + _globals['_JOBREQUEST']._serialized_end=196 + _globals['_JOBRESPONSE']._serialized_start=198 + _globals['_JOBRESPONSE']._serialized_end=262 + _globals['_PROGRESSREQUEST']._serialized_start=264 + _globals['_PROGRESSREQUEST']._serialized_end=297 + _globals['_PROGRESSUPDATE']._serialized_start=300 + _globals['_PROGRESSUPDATE']._serialized_end=456 + _globals['_CANCELREQUEST']._serialized_start=458 + _globals['_CANCELREQUEST']._serialized_end=489 + _globals['_CANCELRESPONSE']._serialized_start=491 + _globals['_CANCELRESPONSE']._serialized_end=559 + _globals['_WORKERSTATUS']._serialized_start=561 + _globals['_WORKERSTATUS']._serialized_end=664 + _globals['_EMPTY']._serialized_start=666 + _globals['_EMPTY']._serialized_end=673 + _globals['_WORKERSERVICE']._serialized_start=676 + _globals['_WORKERSERVICE']._serialized_end=962 +# @@protoc_insertion_point(module_scope) diff --git a/mpr/grpc/worker_pb2_grpc.py b/mpr/grpc/worker_pb2_grpc.py new file mode 100644 index 0000000..d978e98 --- /dev/null +++ b/mpr/grpc/worker_pb2_grpc.py @@ -0,0 +1,250 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" + +import warnings + +import grpc + +from . import worker_pb2 as worker__pb2 + +GRPC_GENERATED_VERSION = "1.76.0" +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + + _version_not_supported = first_version_is_lower( + GRPC_VERSION, GRPC_GENERATED_VERSION + ) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f"The grpc package installed is at version {GRPC_VERSION}," + + " but the generated code in worker_pb2_grpc.py depends on" + + f" grpcio>={GRPC_GENERATED_VERSION}." + + f" Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}" + + f" or downgrade your generated code using grpcio-tools<={GRPC_VERSION}." + ) + + +class WorkerServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SubmitJob = channel.unary_unary( + "/mpr.worker.WorkerService/SubmitJob", + request_serializer=worker__pb2.JobRequest.SerializeToString, + response_deserializer=worker__pb2.JobResponse.FromString, + _registered_method=True, + ) + self.StreamProgress = channel.unary_stream( + "/mpr.worker.WorkerService/StreamProgress", + request_serializer=worker__pb2.ProgressRequest.SerializeToString, + response_deserializer=worker__pb2.ProgressUpdate.FromString, + _registered_method=True, + ) + self.CancelJob = channel.unary_unary( + "/mpr.worker.WorkerService/CancelJob", + request_serializer=worker__pb2.CancelRequest.SerializeToString, + response_deserializer=worker__pb2.CancelResponse.FromString, + _registered_method=True, + ) + self.GetWorkerStatus = channel.unary_unary( + "/mpr.worker.WorkerService/GetWorkerStatus", + request_serializer=worker__pb2.Empty.SerializeToString, + response_deserializer=worker__pb2.WorkerStatus.FromString, + _registered_method=True, + ) + + +class WorkerServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def SubmitJob(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def StreamProgress(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def CancelJob(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def GetWorkerStatus(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_WorkerServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "SubmitJob": grpc.unary_unary_rpc_method_handler( + servicer.SubmitJob, + request_deserializer=worker__pb2.JobRequest.FromString, + response_serializer=worker__pb2.JobResponse.SerializeToString, + ), + "StreamProgress": grpc.unary_stream_rpc_method_handler( + servicer.StreamProgress, + request_deserializer=worker__pb2.ProgressRequest.FromString, + response_serializer=worker__pb2.ProgressUpdate.SerializeToString, + ), + "CancelJob": grpc.unary_unary_rpc_method_handler( + servicer.CancelJob, + request_deserializer=worker__pb2.CancelRequest.FromString, + response_serializer=worker__pb2.CancelResponse.SerializeToString, + ), + "GetWorkerStatus": grpc.unary_unary_rpc_method_handler( + servicer.GetWorkerStatus, + request_deserializer=worker__pb2.Empty.FromString, + response_serializer=worker__pb2.WorkerStatus.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "mpr.worker.WorkerService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers( + "mpr.worker.WorkerService", rpc_method_handlers + ) + + +# This class is part of an EXPERIMENTAL API. +class WorkerService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def SubmitJob( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/mpr.worker.WorkerService/SubmitJob", + worker__pb2.JobRequest.SerializeToString, + worker__pb2.JobResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) + + @staticmethod + def StreamProgress( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/mpr.worker.WorkerService/StreamProgress", + worker__pb2.ProgressRequest.SerializeToString, + worker__pb2.ProgressUpdate.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) + + @staticmethod + def CancelJob( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/mpr.worker.WorkerService/CancelJob", + worker__pb2.CancelRequest.SerializeToString, + worker__pb2.CancelResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) + + @staticmethod + def GetWorkerStatus( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/mpr.worker.WorkerService/GetWorkerStatus", + worker__pb2.Empty.SerializeToString, + worker__pb2.WorkerStatus.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) diff --git a/mpr/media_assets/admin.py b/mpr/media_assets/admin.py index 9d4180e..45591b3 100644 --- a/mpr/media_assets/admin.py +++ b/mpr/media_assets/admin.py @@ -108,14 +108,13 @@ class TranscodePresetAdmin(admin.ModelAdmin): class TranscodeJobAdmin(admin.ModelAdmin): list_display = [ "id_short", - "source_asset", - "preset", + "source_asset_id_short", "status", "progress_display", "created_at", ] - list_filter = ["status", "preset"] - search_fields = ["source_asset__filename", "output_filename"] + list_filter = ["status"] + search_fields = ["output_filename"] readonly_fields = [ "id", "created_at", @@ -128,15 +127,14 @@ class TranscodeJobAdmin(admin.ModelAdmin): "celery_task_id", "preset_snapshot", ] - raw_id_fields = ["source_asset", "preset", "output_asset"] fieldsets = [ - (None, {"fields": ["id", "source_asset", "status", "error_message"]}), + (None, {"fields": ["id", "source_asset_id", "status", "error_message"]}), ( "Configuration", { "fields": [ - "preset", + "preset_id", "preset_snapshot", "trim_start", "trim_end", @@ -144,7 +142,7 @@ class TranscodeJobAdmin(admin.ModelAdmin): ] }, ), - ("Output", {"fields": ["output_filename", "output_path", "output_asset"]}), + ("Output", {"fields": ["output_filename", "output_path", "output_asset_id"]}), ( "Progress", {"fields": ["progress", "current_frame", "current_time", "speed"]}, @@ -168,6 +166,11 @@ class TranscodeJobAdmin(admin.ModelAdmin): id_short.short_description = "ID" + def source_asset_id_short(self, obj): + return str(obj.source_asset_id)[:8] if obj.source_asset_id else "-" + + source_asset_id_short.short_description = "Source" + def progress_display(self, obj): return f"{obj.progress:.1f}%" diff --git a/requirements.txt b/requirements.txt index a6dcb08..c1156a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,10 @@ redis>=5.0.0 # FFmpeg ffmpeg-python>=0.2.0 +# gRPC +grpcio>=1.60.0 +grpcio-tools>=1.60.0 + # Testing pytest>=7.4.0 pytest-django>=4.7.0 diff --git a/worker/tasks.py b/worker/tasks.py index 6a37e09..b4f4e2e 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -8,8 +8,8 @@ from typing import Any, Dict, Optional from celery import shared_task -from grpc.server import update_job_progress from worker.executor import get_executor +from worker_grpc.server import update_job_progress logger = logging.getLogger(__name__)