chunker ui redo
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
"""
|
||||
SSE endpoint for chunker pipeline events.
|
||||
|
||||
Bridges gRPC StreamProgress to browser-native EventSource.
|
||||
GET /api/chunker/stream/{job_id} → text/event-stream
|
||||
Uses Redis as the event bus between Celery workers and the SSE stream.
|
||||
Celery worker pushes events via core.events, SSE endpoint polls them.
|
||||
|
||||
GET /chunker/stream/{job_id} → text/event-stream
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -14,46 +16,39 @@ from typing import AsyncGenerator
|
||||
from fastapi import APIRouter
|
||||
from starlette.responses import StreamingResponse
|
||||
|
||||
from core.events import poll_events
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/chunker", tags=["chunker"])
|
||||
router = APIRouter(prefix="/chunker", tags=["chunker"])
|
||||
|
||||
|
||||
async def _event_generator(job_id: str) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Generate SSE events by polling gRPC job state.
|
||||
|
||||
Yields server-sent events in the format:
|
||||
event: <event_type>
|
||||
data: <json_payload>
|
||||
Generate SSE events by polling Redis for chunk job events.
|
||||
"""
|
||||
from core.rpc.server import _active_jobs
|
||||
|
||||
last_state = None
|
||||
cursor = 0
|
||||
timeout = time.monotonic() + 600 # 10 min max
|
||||
|
||||
while time.monotonic() < timeout:
|
||||
job_state = _active_jobs.get(job_id)
|
||||
events, cursor = poll_events(job_id, cursor)
|
||||
|
||||
if job_state is None:
|
||||
# Job not found yet — may not have started
|
||||
if not events:
|
||||
yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
await asyncio.sleep(0.5)
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
# Only send if state changed
|
||||
if job_state != last_state:
|
||||
last_state = dict(job_state)
|
||||
event_type = job_state.get("status", "update")
|
||||
for data in events:
|
||||
event_type = data.pop("event", "update")
|
||||
payload = {**data, "job_id": job_id}
|
||||
|
||||
yield f"event: {event_type}\ndata: {json.dumps({**job_state, 'job_id': job_id})}\n\n"
|
||||
yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n"
|
||||
|
||||
# End stream when job is terminal
|
||||
if event_type in ("completed", "failed", "cancelled"):
|
||||
if event_type in ("pipeline_complete", "pipeline_error", "cancelled"):
|
||||
yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
break
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
yield f"event: timeout\ndata: {json.dumps({'job_id': job_id})}\n\n"
|
||||
|
||||
|
||||
@@ -15,7 +15,9 @@ from strawberry.schema.config import StrawberryConfig
|
||||
from strawberry.types import Info
|
||||
|
||||
from core.api.schema.graphql import (
|
||||
CancelResultType,
|
||||
ChunkJobType,
|
||||
ChunkOutputFileType,
|
||||
CreateChunkJobInput,
|
||||
CreateJobInput,
|
||||
DeleteResultType,
|
||||
@@ -26,7 +28,7 @@ from core.api.schema.graphql import (
|
||||
TranscodePresetType,
|
||||
UpdateAssetInput,
|
||||
)
|
||||
from core.storage import BUCKET_IN, list_objects
|
||||
from core.storage import BUCKET_IN, list_objects, upload_file
|
||||
|
||||
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".m4v"}
|
||||
AUDIO_EXTS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}
|
||||
@@ -90,6 +92,25 @@ class Query:
|
||||
def system_status(self, info: Info) -> SystemStatusType:
|
||||
return SystemStatusType(status="ok", version="0.1.0")
|
||||
|
||||
@strawberry.field
|
||||
def chunk_output_files(self, info: Info, job_id: str) -> List[ChunkOutputFileType]:
|
||||
"""List output chunk files for a completed job from media/out/."""
|
||||
from pathlib import Path
|
||||
|
||||
media_out = os.environ.get("MEDIA_OUT_DIR", "/app/media/out")
|
||||
output_dir = Path(media_out) / "chunks" / job_id
|
||||
if not output_dir.is_dir():
|
||||
return []
|
||||
return [
|
||||
ChunkOutputFileType(
|
||||
key=f.name,
|
||||
size=f.stat().st_size,
|
||||
url=f"/media/out/chunks/{job_id}/{f.name}",
|
||||
)
|
||||
for f in sorted(output_dir.iterdir())
|
||||
if f.is_file()
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mutations
|
||||
@@ -100,8 +121,26 @@ class Query:
|
||||
class Mutation:
|
||||
@strawberry.mutation
|
||||
def scan_media_folder(self, info: Info) -> ScanResultType:
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from core.db import create_asset, get_asset_filenames
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sync local media/in/ files to MinIO (handles fresh installs / pruned volumes)
|
||||
local_media = Path("/app/media/in")
|
||||
if local_media.is_dir():
|
||||
existing_keys = {o["key"] for o in list_objects(BUCKET_IN)}
|
||||
for f in local_media.iterdir():
|
||||
if f.is_file() and f.suffix.lower() in MEDIA_EXTS:
|
||||
if f.name not in existing_keys:
|
||||
try:
|
||||
upload_file(str(f), BUCKET_IN, f.name)
|
||||
logger.info("Uploaded %s to MinIO", f.name)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to upload %s: %s", f.name, e)
|
||||
|
||||
objects = list_objects(BUCKET_IN, extensions=MEDIA_EXTS)
|
||||
existing = get_asset_filenames()
|
||||
|
||||
@@ -284,6 +323,8 @@ class Mutation:
|
||||
"num_workers": input.num_workers,
|
||||
"max_retries": input.max_retries,
|
||||
"processor_type": input.processor_type,
|
||||
"start_time": input.start_time,
|
||||
"end_time": input.end_time,
|
||||
}
|
||||
|
||||
executor_mode = os.environ.get("MPR_EXECUTOR", "local")
|
||||
@@ -320,6 +361,17 @@ class Mutation:
|
||||
celery_task_id=celery_task_id,
|
||||
)
|
||||
|
||||
@strawberry.mutation
|
||||
def cancel_chunk_job(self, info: Info, celery_task_id: str) -> CancelResultType:
|
||||
"""Cancel a running chunk job by revoking its Celery task."""
|
||||
try:
|
||||
from admin.mpr.celery import app as celery_app
|
||||
|
||||
celery_app.control.revoke(celery_task_id, terminate=True, signal="SIGTERM")
|
||||
return CancelResultType(ok=True, message="Task revoked")
|
||||
except Exception as e:
|
||||
return CancelResultType(ok=False, message=str(e))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schema
|
||||
|
||||
@@ -37,7 +37,7 @@ class MediaAssetType:
|
||||
file_path: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
error_message: Optional[str] = None
|
||||
file_size: Optional[int] = None
|
||||
file_size: Optional[float] = None
|
||||
duration: Optional[float] = None
|
||||
video_codec: Optional[str] = None
|
||||
audio_codec: Optional[str] = None
|
||||
@@ -205,3 +205,22 @@ class CreateChunkJobInput:
|
||||
max_retries: int = 3
|
||||
processor_type: str = "ffmpeg"
|
||||
priority: int = 0
|
||||
start_time: Optional[float] = None
|
||||
end_time: Optional[float] = None
|
||||
|
||||
|
||||
@strawberry.type
|
||||
class CancelResultType:
|
||||
"""Result of cancelling a chunk job."""
|
||||
|
||||
ok: bool = False
|
||||
message: Optional[str] = None
|
||||
|
||||
|
||||
@strawberry.type
|
||||
class ChunkOutputFileType:
|
||||
"""A chunk output file in S3/MinIO with presigned download URL."""
|
||||
|
||||
key: str
|
||||
size: int = 0
|
||||
url: str = ""
|
||||
|
||||
@@ -28,7 +28,13 @@ class Chunker:
|
||||
chunk_duration: Duration of each chunk in seconds (default: 10.0)
|
||||
"""
|
||||
|
||||
def __init__(self, file_path: str, chunk_duration: float = 10.0):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: str,
|
||||
chunk_duration: float = 10.0,
|
||||
start_time: float | None = None,
|
||||
end_time: float | None = None,
|
||||
):
|
||||
if not os.path.isfile(file_path):
|
||||
raise ChunkReadError(f"File not found: {file_path}")
|
||||
if chunk_duration <= 0:
|
||||
@@ -37,7 +43,16 @@ class Chunker:
|
||||
self.file_path = file_path
|
||||
self.chunk_duration = chunk_duration
|
||||
self.file_size = os.path.getsize(file_path)
|
||||
self.source_duration = self._probe_duration()
|
||||
full_duration = self._probe_duration()
|
||||
|
||||
# Apply time range
|
||||
self.range_start = max(start_time or 0.0, 0.0)
|
||||
self.range_end = min(end_time or full_duration, full_duration)
|
||||
if self.range_start >= self.range_end:
|
||||
raise ValueError(
|
||||
f"Invalid range: start={self.range_start} >= end={self.range_end}"
|
||||
)
|
||||
self.source_duration = self.range_end - self.range_start
|
||||
|
||||
def _probe_duration(self) -> float:
|
||||
"""Get source file duration via FFmpeg probe."""
|
||||
@@ -71,9 +86,9 @@ class Chunker:
|
||||
"""
|
||||
total = self.expected_chunks
|
||||
for sequence in range(total):
|
||||
start_time = sequence * self.chunk_duration
|
||||
start_time = self.range_start + sequence * self.chunk_duration
|
||||
end_time = min(
|
||||
start_time + self.chunk_duration, self.source_duration
|
||||
start_time + self.chunk_duration, self.range_end
|
||||
)
|
||||
duration = end_time - start_time
|
||||
|
||||
|
||||
@@ -57,6 +57,8 @@ class Pipeline:
|
||||
queue_size: int = 10,
|
||||
event_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
|
||||
output_dir: Optional[str] = None,
|
||||
start_time: Optional[float] = None,
|
||||
end_time: Optional[float] = None,
|
||||
):
|
||||
self.source = source
|
||||
self.chunk_duration = chunk_duration
|
||||
@@ -66,6 +68,8 @@ class Pipeline:
|
||||
self.queue_size = queue_size
|
||||
self.event_callback = event_callback
|
||||
self.output_dir = output_dir
|
||||
self.start_time = start_time
|
||||
self.end_time = end_time
|
||||
|
||||
def _emit(self, event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""Emit an event if callback is registered."""
|
||||
@@ -92,6 +96,19 @@ class Pipeline:
|
||||
finally:
|
||||
chunk_queue.close()
|
||||
|
||||
def _monitor_progress(
|
||||
self, start_time: float, file_size: int, stop_event: threading.Event
|
||||
) -> None:
|
||||
"""Monitor thread: emit pipeline_progress every 500ms."""
|
||||
while not stop_event.is_set():
|
||||
elapsed = time.monotonic() - start_time
|
||||
mb = file_size / (1024 * 1024)
|
||||
self._emit("pipeline_progress", {
|
||||
"elapsed": round(elapsed, 2),
|
||||
"throughput_mbps": round(mb / elapsed, 2) if elapsed > 0 else 0,
|
||||
})
|
||||
stop_event.wait(0.5)
|
||||
|
||||
def _write_manifest(
|
||||
self, result: PipelineResult, source_duration: float
|
||||
) -> None:
|
||||
@@ -146,7 +163,12 @@ class Pipeline:
|
||||
|
||||
try:
|
||||
# Stage 1: Set up chunker (probes file for duration)
|
||||
chunker = Chunker(self.source, self.chunk_duration)
|
||||
chunker = Chunker(
|
||||
self.source,
|
||||
self.chunk_duration,
|
||||
start_time=self.start_time,
|
||||
end_time=self.end_time,
|
||||
)
|
||||
total_chunks = chunker.expected_chunks
|
||||
|
||||
if total_chunks == 0:
|
||||
@@ -170,9 +192,18 @@ class Pipeline:
|
||||
output_dir=self.output_dir,
|
||||
)
|
||||
|
||||
# Stage 3: Start workers, then produce chunks
|
||||
# Stage 3: Start workers, monitor, then produce chunks
|
||||
pool.start()
|
||||
|
||||
monitor_stop = threading.Event()
|
||||
monitor = threading.Thread(
|
||||
target=self._monitor_progress,
|
||||
args=(start_time, chunker.file_size, monitor_stop),
|
||||
name="progress-monitor",
|
||||
daemon=True,
|
||||
)
|
||||
monitor.start()
|
||||
|
||||
producer = threading.Thread(
|
||||
target=self._produce_chunks,
|
||||
args=(chunker, chunk_queue),
|
||||
@@ -185,6 +216,10 @@ class Pipeline:
|
||||
all_results = pool.wait()
|
||||
producer.join(timeout=5.0)
|
||||
|
||||
# Stop monitor
|
||||
monitor_stop.set()
|
||||
monitor.join(timeout=2.0)
|
||||
|
||||
# Stage 5: Collect results in order
|
||||
collector = ResultCollector(total_chunks)
|
||||
for r in all_results:
|
||||
|
||||
@@ -124,6 +124,7 @@ class Worker:
|
||||
self._emit("chunk_processing", {
|
||||
"sequence": chunk.sequence,
|
||||
"state": "processing",
|
||||
"queue_size": self.chunk_queue.qsize(),
|
||||
})
|
||||
|
||||
result = self._process_with_retry(chunk)
|
||||
@@ -135,6 +136,7 @@ class Worker:
|
||||
"success": result.success,
|
||||
"processing_time": result.processing_time,
|
||||
"retries": result.retries,
|
||||
"queue_size": self.chunk_queue.qsize(),
|
||||
})
|
||||
|
||||
self._emit("worker_status", {"state": "stopped"})
|
||||
|
||||
40
core/events.py
Normal file
40
core/events.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""
|
||||
Redis-based event bus for pipeline job progress.
|
||||
|
||||
Celery workers push events, SSE endpoints poll them.
|
||||
Only depends on redis — safe to import from any context.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import redis
|
||||
|
||||
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
|
||||
|
||||
|
||||
def _get_redis():
|
||||
return redis.from_url(REDIS_URL, decode_responses=True)
|
||||
|
||||
|
||||
def push_event(job_id: str, event_type: str, data: dict) -> None:
|
||||
"""Push an event to the Redis list for a job."""
|
||||
r = _get_redis()
|
||||
key = f"chunk_events:{job_id}"
|
||||
event = json.dumps({"event": event_type, **data})
|
||||
r.rpush(key, event)
|
||||
r.expire(key, 3600)
|
||||
|
||||
|
||||
def poll_events(job_id: str, cursor: int = 0) -> tuple[list[dict], int]:
|
||||
"""Poll new events from Redis. Returns (events, new_cursor)."""
|
||||
r = _get_redis()
|
||||
key = f"chunk_events:{job_id}"
|
||||
raw_events = r.lrange(key, cursor, -1)
|
||||
parsed = []
|
||||
for raw in raw_events:
|
||||
try:
|
||||
parsed.append(json.loads(raw))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
return parsed, cursor + len(raw_events)
|
||||
@@ -2,22 +2,24 @@
|
||||
ChunkHandler — job handler that wraps the chunker Pipeline.
|
||||
|
||||
Downloads source from S3/MinIO, runs FFmpeg chunking pipeline,
|
||||
uploads mp4 segments + manifest back to S3/MinIO.
|
||||
writes mp4 segments + manifest to media/out/chunks/{job_id}/.
|
||||
Pushes real-time events to Redis for SSE consumption.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from core.events import push_event as push_chunk_event
|
||||
from core.chunker import Pipeline
|
||||
from core.storage import BUCKET_IN, BUCKET_OUT, download_to_temp, upload_file
|
||||
from core.storage import BUCKET_IN, download_to_temp
|
||||
|
||||
from .base import Handler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MEDIA_OUT_DIR = os.environ.get("MEDIA_OUT_DIR", "/app/media/out")
|
||||
|
||||
|
||||
class ChunkHandler(Handler):
|
||||
"""
|
||||
@@ -44,14 +46,19 @@ class ChunkHandler(Handler):
|
||||
logger.info(f"ChunkHandler starting job {job_id}: {source_key}")
|
||||
|
||||
# Download source from S3/MinIO
|
||||
push_chunk_event(job_id, "pipeline_start", {"status": "downloading", "source_key": source_key})
|
||||
tmp_source = download_to_temp(BUCKET_IN, source_key)
|
||||
|
||||
# Create temp output directory for chunks
|
||||
tmp_output_dir = tempfile.mkdtemp(prefix=f"chunks-{job_id}-")
|
||||
# Output directory: media/out/chunks/{job_id}/
|
||||
output_dir = os.path.join(MEDIA_OUT_DIR, "chunks", job_id)
|
||||
if processor_type == "ffmpeg":
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
def event_bridge(event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""Bridge pipeline events to the job progress callback."""
|
||||
"""Bridge pipeline events to Redis + optional progress callback."""
|
||||
push_chunk_event(job_id, event_type, data)
|
||||
|
||||
if progress_callback and event_type == "pipeline_complete":
|
||||
progress_callback(100, data)
|
||||
elif progress_callback and event_type == "chunk_done":
|
||||
@@ -68,29 +75,28 @@ class ChunkHandler(Handler):
|
||||
processor_type=processor_type,
|
||||
queue_size=payload.get("queue_size", 10),
|
||||
event_callback=event_bridge,
|
||||
output_dir=tmp_output_dir if processor_type == "ffmpeg" else None,
|
||||
output_dir=output_dir if processor_type == "ffmpeg" else None,
|
||||
start_time=payload.get("start_time"),
|
||||
end_time=payload.get("end_time"),
|
||||
)
|
||||
|
||||
result = pipeline.run()
|
||||
|
||||
# Upload chunks + manifest to S3/MinIO
|
||||
# Files are already in media/out/chunks/{job_id}/
|
||||
output_prefix = f"chunks/{job_id}"
|
||||
uploaded_files = []
|
||||
output_files = [
|
||||
f"{output_prefix}/{os.path.basename(f)}"
|
||||
for f in result.chunk_files
|
||||
]
|
||||
|
||||
for chunk_file in result.chunk_files:
|
||||
filename = os.path.basename(chunk_file)
|
||||
output_key = f"{output_prefix}/{filename}"
|
||||
upload_file(chunk_file, BUCKET_OUT, output_key)
|
||||
uploaded_files.append(output_key)
|
||||
logger.info(f"Uploaded {output_key}")
|
||||
|
||||
# Upload manifest
|
||||
manifest_path = os.path.join(tmp_output_dir, "manifest.json")
|
||||
if os.path.exists(manifest_path):
|
||||
manifest_key = f"{output_prefix}/manifest.json"
|
||||
upload_file(manifest_path, BUCKET_OUT, manifest_key)
|
||||
uploaded_files.append(manifest_key)
|
||||
logger.info(f"Uploaded {manifest_key}")
|
||||
push_chunk_event(job_id, "pipeline_complete", {
|
||||
"status": "completed",
|
||||
"total_chunks": result.total_chunks,
|
||||
"processed": result.processed,
|
||||
"failed": result.failed,
|
||||
"elapsed": result.elapsed_time,
|
||||
"throughput_mbps": result.throughput_mbps,
|
||||
})
|
||||
|
||||
return {
|
||||
"status": "completed" if result.failed == 0 else "completed_with_errors",
|
||||
@@ -104,16 +110,16 @@ class ChunkHandler(Handler):
|
||||
"errors": result.errors,
|
||||
"chunks_in_order": result.chunks_in_order,
|
||||
"output_prefix": output_prefix,
|
||||
"uploaded_files": uploaded_files,
|
||||
"output_files": output_files,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
push_chunk_event(job_id, "pipeline_error", {"status": "failed", "error": str(e)})
|
||||
raise
|
||||
|
||||
finally:
|
||||
# Cleanup temp files
|
||||
# Cleanup temp source file only (output dir is persistent)
|
||||
try:
|
||||
os.unlink(tmp_source)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
shutil.rmtree(tmp_output_dir, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@@ -11,6 +11,7 @@ service WorkerService {
|
||||
rpc StreamProgress(ProgressRequest) returns (stream ProgressUpdate);
|
||||
rpc CancelJob(CancelRequest) returns (CancelResponse);
|
||||
rpc GetWorkerStatus(Empty) returns (WorkerStatus);
|
||||
rpc StreamChunkPipeline(ChunkStreamRequest) returns (stream ChunkPipelineEvent);
|
||||
}
|
||||
|
||||
message JobRequest {
|
||||
@@ -62,3 +63,24 @@ message WorkerStatus {
|
||||
message Empty {
|
||||
// Empty
|
||||
}
|
||||
|
||||
message ChunkStreamRequest {
|
||||
string job_id = 1;
|
||||
}
|
||||
|
||||
message ChunkPipelineEvent {
|
||||
string job_id = 1;
|
||||
string event_type = 2;
|
||||
int32 sequence = 3;
|
||||
string worker_id = 4;
|
||||
string state = 5;
|
||||
int32 queue_size = 6;
|
||||
float elapsed = 7;
|
||||
float throughput_mbps = 8;
|
||||
int32 total_chunks = 9;
|
||||
int32 processed_chunks = 10;
|
||||
int32 failed_chunks = 11;
|
||||
string error = 12;
|
||||
float processing_time = 13;
|
||||
int32 retries = 14;
|
||||
}
|
||||
|
||||
@@ -173,6 +173,43 @@ class WorkerServicer(worker_pb2_grpc.WorkerServiceServicer):
|
||||
message="Job not found",
|
||||
)
|
||||
|
||||
def StreamChunkPipeline(self, request, context) -> Iterator[worker_pb2.ChunkPipelineEvent]:
|
||||
"""Stream chunk pipeline events for a job."""
|
||||
from core.events import poll_events
|
||||
|
||||
job_id = request.job_id
|
||||
logger.info(f"StreamChunkPipeline: {job_id}")
|
||||
|
||||
cursor = 0
|
||||
timeout = time.monotonic() + 600 # 10 min max
|
||||
|
||||
while context.is_active() and time.monotonic() < timeout:
|
||||
events, cursor = poll_events(job_id, cursor)
|
||||
|
||||
for data in events:
|
||||
event_type = data.pop("event", "")
|
||||
yield worker_pb2.ChunkPipelineEvent(
|
||||
job_id=job_id,
|
||||
event_type=event_type,
|
||||
sequence=data.get("sequence", 0),
|
||||
worker_id=data.get("worker_id", ""),
|
||||
state=data.get("state", ""),
|
||||
queue_size=data.get("queue_size", 0),
|
||||
elapsed=data.get("elapsed", 0.0),
|
||||
throughput_mbps=data.get("throughput_mbps", 0.0),
|
||||
total_chunks=data.get("total_chunks", 0),
|
||||
processed_chunks=data.get("processed_chunks", 0),
|
||||
failed_chunks=data.get("failed_chunks", 0),
|
||||
error=data.get("error", ""),
|
||||
processing_time=data.get("processing_time", 0.0),
|
||||
retries=data.get("retries", 0),
|
||||
)
|
||||
|
||||
if event_type in ("pipeline_complete", "pipeline_error"):
|
||||
return
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
def GetWorkerStatus(self, request, context):
|
||||
"""Get worker health and capabilities."""
|
||||
try:
|
||||
|
||||
@@ -24,7 +24,7 @@ _sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cworker.proto\x12\nmpr.worker\"\xa7\x01\n\nJobRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x13\n\x0bsource_path\x18\x02 \x01(\t\x12\x13\n\x0boutput_path\x18\x03 \x01(\t\x12\x13\n\x0bpreset_json\x18\x04 \x01(\t\x12\x17\n\ntrim_start\x18\x05 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08trim_end\x18\x06 \x01(\x02H\x01\x88\x01\x01\x42\r\n\x0b_trim_startB\x0b\n\t_trim_end\"@\n\x0bJobResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08\x61\x63\x63\x65pted\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"!\n\x0fProgressRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\x9c\x01\n\x0eProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08progress\x18\x02 \x01(\x05\x12\x15\n\rcurrent_frame\x18\x03 \x01(\x05\x12\x14\n\x0c\x63urrent_time\x18\x04 \x01(\x02\x12\r\n\x05speed\x18\x05 \x01(\x02\x12\x0e\n\x06status\x18\x06 \x01(\t\x12\x12\n\x05\x65rror\x18\x07 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x1f\n\rCancelRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"D\n\x0e\x43\x61ncelResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x11\n\tcancelled\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"g\n\x0cWorkerStatus\x12\x11\n\tavailable\x18\x01 \x01(\x08\x12\x13\n\x0b\x61\x63tive_jobs\x18\x02 \x01(\x05\x12\x18\n\x10supported_codecs\x18\x03 \x03(\t\x12\x15\n\rgpu_available\x18\x04 \x01(\x08\"\x07\n\x05\x45mpty2\x9e\x02\n\rWorkerService\x12<\n\tSubmitJob\x12\x16.mpr.worker.JobRequest\x1a\x17.mpr.worker.JobResponse\x12K\n\x0eStreamProgress\x12\x1b.mpr.worker.ProgressRequest\x1a\x1a.mpr.worker.ProgressUpdate0\x01\x12\x42\n\tCancelJob\x12\x19.mpr.worker.CancelRequest\x1a\x1a.mpr.worker.CancelResponse\x12>\n\x0fGetWorkerStatus\x12\x11.mpr.worker.Empty\x1a\x18.mpr.worker.WorkerStatusb\x06proto3')
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cworker.proto\x12\nmpr.worker\"\xa7\x01\n\nJobRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x13\n\x0bsource_path\x18\x02 \x01(\t\x12\x13\n\x0boutput_path\x18\x03 \x01(\t\x12\x13\n\x0bpreset_json\x18\x04 \x01(\t\x12\x17\n\ntrim_start\x18\x05 \x01(\x02H\x00\x88\x01\x01\x12\x15\n\x08trim_end\x18\x06 \x01(\x02H\x01\x88\x01\x01\x42\r\n\x0b_trim_startB\x0b\n\t_trim_end\"@\n\x0bJobResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08\x61\x63\x63\x65pted\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"!\n\x0fProgressRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\x9c\x01\n\x0eProgressUpdate\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x10\n\x08progress\x18\x02 \x01(\x05\x12\x15\n\rcurrent_frame\x18\x03 \x01(\x05\x12\x14\n\x0c\x63urrent_time\x18\x04 \x01(\x02\x12\r\n\x05speed\x18\x05 \x01(\x02\x12\x0e\n\x06status\x18\x06 \x01(\t\x12\x12\n\x05\x65rror\x18\x07 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x1f\n\rCancelRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"D\n\x0e\x43\x61ncelResponse\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x11\n\tcancelled\x18\x02 \x01(\x08\x12\x0f\n\x07message\x18\x03 \x01(\t\"g\n\x0cWorkerStatus\x12\x11\n\tavailable\x18\x01 \x01(\x08\x12\x13\n\x0b\x61\x63tive_jobs\x18\x02 \x01(\x05\x12\x18\n\x10supported_codecs\x18\x03 \x03(\t\x12\x15\n\rgpu_available\x18\x04 \x01(\x08\"\x07\n\x05\x45mpty\"$\n\x12\x43hunkStreamRequest\x12\x0e\n\x06job_id\x18\x01 \x01(\t\"\xaa\x02\n\x12\x43hunkPipelineEvent\x12\x0e\n\x06job_id\x18\x01 \x01(\t\x12\x12\n\nevent_type\x18\x02 \x01(\t\x12\x10\n\x08sequence\x18\x03 \x01(\x05\x12\x11\n\tworker_id\x18\x04 \x01(\t\x12\r\n\x05state\x18\x05 \x01(\t\x12\x12\n\nqueue_size\x18\x06 \x01(\x05\x12\x0f\n\x07\x65lapsed\x18\x07 \x01(\x02\x12\x17\n\x0fthroughput_mbps\x18\x08 \x01(\x02\x12\x14\n\x0ctotal_chunks\x18\t \x01(\x05\x12\x18\n\x10processed_chunks\x18\n \x01(\x05\x12\x15\n\rfailed_chunks\x18\x0b \x01(\x05\x12\r\n\x05\x65rror\x18\x0c \x01(\t\x12\x17\n\x0fprocessing_time\x18\r \x01(\x02\x12\x0f\n\x07retries\x18\x0e \x01(\x05\x32\xf7\x02\n\rWorkerService\x12<\n\tSubmitJob\x12\x16.mpr.worker.JobRequest\x1a\x17.mpr.worker.JobResponse\x12K\n\x0eStreamProgress\x12\x1b.mpr.worker.ProgressRequest\x1a\x1a.mpr.worker.ProgressUpdate0\x01\x12\x42\n\tCancelJob\x12\x19.mpr.worker.CancelRequest\x1a\x1a.mpr.worker.CancelResponse\x12>\n\x0fGetWorkerStatus\x12\x11.mpr.worker.Empty\x1a\x18.mpr.worker.WorkerStatus\x12W\n\x13StreamChunkPipeline\x12\x1e.mpr.worker.ChunkStreamRequest\x1a\x1e.mpr.worker.ChunkPipelineEvent0\x01\x62\x06proto3')
|
||||
|
||||
_globals = globals()
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
|
||||
@@ -47,6 +47,10 @@ if not _descriptor._USE_C_DESCRIPTORS:
|
||||
_globals['_WORKERSTATUS']._serialized_end=664
|
||||
_globals['_EMPTY']._serialized_start=666
|
||||
_globals['_EMPTY']._serialized_end=673
|
||||
_globals['_WORKERSERVICE']._serialized_start=676
|
||||
_globals['_WORKERSERVICE']._serialized_end=962
|
||||
_globals['_CHUNKSTREAMREQUEST']._serialized_start=675
|
||||
_globals['_CHUNKSTREAMREQUEST']._serialized_end=711
|
||||
_globals['_CHUNKPIPELINEEVENT']._serialized_start=714
|
||||
_globals['_CHUNKPIPELINEEVENT']._serialized_end=1012
|
||||
_globals['_WORKERSERVICE']._serialized_start=1015
|
||||
_globals['_WORKERSERVICE']._serialized_end=1390
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
|
||||
@@ -5,7 +5,7 @@ import warnings
|
||||
|
||||
from . import worker_pb2 as worker__pb2
|
||||
|
||||
GRPC_GENERATED_VERSION = '1.76.0'
|
||||
GRPC_GENERATED_VERSION = '1.78.0'
|
||||
GRPC_VERSION = grpc.__version__
|
||||
_version_not_supported = False
|
||||
|
||||
@@ -54,6 +54,11 @@ class WorkerServiceStub(object):
|
||||
request_serializer=worker__pb2.Empty.SerializeToString,
|
||||
response_deserializer=worker__pb2.WorkerStatus.FromString,
|
||||
_registered_method=True)
|
||||
self.StreamChunkPipeline = channel.unary_stream(
|
||||
'/mpr.worker.WorkerService/StreamChunkPipeline',
|
||||
request_serializer=worker__pb2.ChunkStreamRequest.SerializeToString,
|
||||
response_deserializer=worker__pb2.ChunkPipelineEvent.FromString,
|
||||
_registered_method=True)
|
||||
|
||||
|
||||
class WorkerServiceServicer(object):
|
||||
@@ -83,6 +88,12 @@ class WorkerServiceServicer(object):
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
def StreamChunkPipeline(self, request, context):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
raise NotImplementedError('Method not implemented!')
|
||||
|
||||
|
||||
def add_WorkerServiceServicer_to_server(servicer, server):
|
||||
rpc_method_handlers = {
|
||||
@@ -106,6 +117,11 @@ def add_WorkerServiceServicer_to_server(servicer, server):
|
||||
request_deserializer=worker__pb2.Empty.FromString,
|
||||
response_serializer=worker__pb2.WorkerStatus.SerializeToString,
|
||||
),
|
||||
'StreamChunkPipeline': grpc.unary_stream_rpc_method_handler(
|
||||
servicer.StreamChunkPipeline,
|
||||
request_deserializer=worker__pb2.ChunkStreamRequest.FromString,
|
||||
response_serializer=worker__pb2.ChunkPipelineEvent.SerializeToString,
|
||||
),
|
||||
}
|
||||
generic_handler = grpc.method_handlers_generic_handler(
|
||||
'mpr.worker.WorkerService', rpc_method_handlers)
|
||||
@@ -224,3 +240,30 @@ class WorkerService(object):
|
||||
timeout,
|
||||
metadata,
|
||||
_registered_method=True)
|
||||
|
||||
@staticmethod
|
||||
def StreamChunkPipeline(request,
|
||||
target,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
call_credentials=None,
|
||||
insecure=False,
|
||||
compression=None,
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
return grpc.experimental.unary_stream(
|
||||
request,
|
||||
target,
|
||||
'/mpr.worker.WorkerService/StreamChunkPipeline',
|
||||
worker__pb2.ChunkStreamRequest.SerializeToString,
|
||||
worker__pb2.ChunkPipelineEvent.FromString,
|
||||
options,
|
||||
channel_credentials,
|
||||
insecure,
|
||||
call_credentials,
|
||||
compression,
|
||||
wait_for_ready,
|
||||
timeout,
|
||||
metadata,
|
||||
_registered_method=True)
|
||||
|
||||
@@ -13,8 +13,8 @@
|
||||
},
|
||||
{
|
||||
"target": "typescript",
|
||||
"output": "ui/timeline/src/types.ts",
|
||||
"include": ["dataclasses", "enums", "api"]
|
||||
"output": "ui/common/types/generated.ts",
|
||||
"include": ["dataclasses", "enums", "api", "views"]
|
||||
},
|
||||
{
|
||||
"target": "protobuf",
|
||||
|
||||
@@ -16,6 +16,8 @@ from .grpc import (
|
||||
GRPC_SERVICE,
|
||||
CancelRequest,
|
||||
CancelResponse,
|
||||
ChunkPipelineEvent,
|
||||
ChunkStreamRequest,
|
||||
Empty,
|
||||
JobRequest,
|
||||
JobResponse,
|
||||
@@ -26,6 +28,7 @@ from .grpc import (
|
||||
from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob
|
||||
from .media import AssetStatus, MediaAsset
|
||||
from .presets import BUILTIN_PRESETS, TranscodePreset
|
||||
from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent
|
||||
|
||||
# Core domain models - generates Django, Pydantic, TypeScript
|
||||
DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob, ChunkJob]
|
||||
@@ -44,6 +47,9 @@ API_MODELS = [
|
||||
# Status enums - included in generated code
|
||||
ENUMS = [AssetStatus, JobStatus, ChunkJobStatus]
|
||||
|
||||
# View/event models - generates TypeScript for UI consumption
|
||||
VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile]
|
||||
|
||||
# gRPC messages - generates Proto
|
||||
GRPC_MESSAGES = [
|
||||
JobRequest,
|
||||
@@ -54,6 +60,8 @@ GRPC_MESSAGES = [
|
||||
CancelResponse,
|
||||
WorkerStatus,
|
||||
Empty,
|
||||
ChunkStreamRequest,
|
||||
ChunkPipelineEvent,
|
||||
]
|
||||
|
||||
__all__ = [
|
||||
@@ -82,10 +90,18 @@ __all__ = [
|
||||
"CancelResponse",
|
||||
"WorkerStatus",
|
||||
"Empty",
|
||||
"ChunkStreamRequest",
|
||||
"ChunkPipelineEvent",
|
||||
# Views
|
||||
"ChunkEvent",
|
||||
"WorkerEvent",
|
||||
"PipelineStats",
|
||||
"ChunkOutputFile",
|
||||
# For generator
|
||||
"DATACLASSES",
|
||||
"API_MODELS",
|
||||
"ENUMS",
|
||||
"VIEWS",
|
||||
"GRPC_MESSAGES",
|
||||
"BUILTIN_PRESETS",
|
||||
]
|
||||
|
||||
@@ -41,6 +41,13 @@ class CancelRequest:
|
||||
job_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkStreamRequest:
|
||||
"""Request to stream chunk pipeline events."""
|
||||
|
||||
job_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Empty:
|
||||
"""Empty message for requests with no parameters."""
|
||||
@@ -94,6 +101,26 @@ class WorkerStatus:
|
||||
gpu_available: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkPipelineEvent:
|
||||
"""Streaming chunk pipeline event."""
|
||||
|
||||
job_id: str
|
||||
event_type: str # pipeline_start, chunk_queued, chunk_done, etc.
|
||||
sequence: int = 0
|
||||
worker_id: str = ""
|
||||
state: str = ""
|
||||
queue_size: int = 0
|
||||
elapsed: float = 0.0
|
||||
throughput_mbps: float = 0.0
|
||||
total_chunks: int = 0
|
||||
processed_chunks: int = 0
|
||||
failed_chunks: int = 0
|
||||
error: str = ""
|
||||
processing_time: float = 0.0
|
||||
retries: int = 0
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Service Definition (for documentation, generator uses this)
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -126,5 +153,11 @@ GRPC_SERVICE = {
|
||||
"response": WorkerStatus,
|
||||
"stream_response": False,
|
||||
},
|
||||
{
|
||||
"name": "StreamChunkPipeline",
|
||||
"request": ChunkStreamRequest,
|
||||
"response": ChunkPipelineEvent,
|
||||
"stream_response": True, # Server streaming
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
57
core/schema/models/views.py
Normal file
57
core/schema/models/views.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""
|
||||
View/Event Schema Definitions
|
||||
|
||||
Projections of domain models for UI consumption via SSE events.
|
||||
These reference existing schema types (e.g., ChunkJobStatus) to maintain
|
||||
type-level dependencies — if the domain model changes, views update too.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkEvent:
|
||||
"""SSE event for a single chunk's lifecycle."""
|
||||
|
||||
sequence: int
|
||||
status: str
|
||||
size: Optional[int] = None
|
||||
worker_id: Optional[str] = None
|
||||
processing_time: Optional[float] = None
|
||||
error: Optional[str] = None
|
||||
retries: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class WorkerEvent:
|
||||
"""SSE event for worker state changes."""
|
||||
|
||||
worker_id: str
|
||||
state: str
|
||||
current_chunk: Optional[int] = None
|
||||
processed: int = 0
|
||||
errors: int = 0
|
||||
retries: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineStats:
|
||||
"""Aggregate pipeline statistics, updated via SSE."""
|
||||
|
||||
total_chunks: int = 0
|
||||
processed: int = 0
|
||||
failed: int = 0
|
||||
retries: int = 0
|
||||
elapsed: float = 0.0
|
||||
throughput_mbps: float = 0.0
|
||||
queue_size: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkOutputFile:
|
||||
"""A chunk output file in S3/MinIO with presigned download URL."""
|
||||
|
||||
key: str
|
||||
size: int = 0
|
||||
url: str = ""
|
||||
Reference in New Issue
Block a user