diff --git a/docs/architecture/01-architecture.dot b/docs/architecture/01-architecture.dot new file mode 100644 index 0000000..fd8ee9d --- /dev/null +++ b/docs/architecture/01-architecture.dot @@ -0,0 +1,75 @@ +digraph mpr_architecture { + rankdir=LR + bgcolor="#0a0e17" + fontname="Helvetica" + node [fontname="Helvetica" fontsize=11 style=filled color="#1e2a4a" fontcolor="#e8eaf0" shape=box] + edge [fontname="Helvetica" fontsize=9 fontcolor="#8892a8" color="#4a5568"] + + label="System Architecture" + labelloc=t + fontsize=16 + fontcolor="#0066ff" + + subgraph cluster_browser { + label="Browser" + style=dashed + color="#1e2a4a" + fontcolor="#8892a8" + + ui [label="detection-app\n(Vue 3 + @vue-flow)" fillcolor="#121829"] + wasm [label="OpenCV WASM\n(edge / field stages)" fillcolor="#1a1a3a" fontcolor="#0066ff"] + chunker [label="chunker UI\n(standalone test util)" fillcolor="#121829" fontcolor="#8892a8"] + } + + subgraph cluster_k8s { + label="K8s cluster (Kind in dev)" + style=dashed + color="#0066ff" + fontcolor="#0066ff" + + gateway [label="Envoy Gateway\nport 8080" fillcolor="#0d1a33" shape=octagon] + ui_pod [label="detection-ui pod\n(Vite :5175)" fillcolor="#121829"] + api [label="FastAPI\n:8702 /detect/*" fillcolor="#121829"] + + subgraph cluster_data { + label="Data plane" + style=dashed + color="#1e2a4a" + fontcolor="#4a5568" + + pg [label="PostgreSQL\njobs · profiles\ncheckpoints" fillcolor="#121829" shape=cylinder] + redis [label="Redis\n(SSE fan-out)" fillcolor="#121829" shape=cylinder] + minio [label="MinIO\nmedia · overlays" fillcolor="#121829" shape=cylinder] + } + } + + subgraph cluster_gpu { + label="GPU host (LAN)" + style=dashed + color="#1e2a4a" + fontcolor="#8892a8" + + gpu [label="inference server\nYOLO · OCR · VLM\nedge · field segmentation" fillcolor="#1a3a1a" fontcolor="#00c853" shape=box] + } + + subgraph cluster_cloud { + label="Cloud VLM providers" + style=dashed + color="#1e2a4a" + fontcolor="#8892a8" + + cloud [label="Anthropic · Gemini\nOpenAI · Groq" fillcolor="#243056" shape=octagon] + } + + ui -> gateway [label="HTTP / SSE"] + chunker -> gateway [label="HTTP"] + ui -> wasm [label="worker" color="#0066ff"] + gateway -> ui_pod [label="/ /detection/*"] + gateway -> api [label="/api/*\n/api/detect/stream/*" color="#0066ff"] + api -> pg + api -> redis [label="publish events" style=dotted] + api -> minio [label="frames · overlays"] + api -> gpu [label="HTTP\nINFERENCE_URL" color="#00c853"] + api -> cloud [label="VLM escalation" style=dashed] + redis -> api [label="SSE consumer" style=dashed color="#8892a8"] +} diff --git a/docs/architecture/01-architecture.svg b/docs/architecture/01-architecture.svg new file mode 100644 index 0000000..0546aa2 --- /dev/null +++ b/docs/architecture/01-architecture.svg @@ -0,0 +1,199 @@ + + + + + + +mpr_architecture + +System Architecture + +cluster_browser + +Browser + + +cluster_k8s + +K8s cluster (Kind in dev) + + +cluster_data + +Data plane + + +cluster_gpu + +GPU host (LAN) + + +cluster_cloud + +Cloud VLM providers + + + +ui + +detection-app +(Vue 3 + @vue-flow) + + + +wasm + +OpenCV WASM +(edge / field stages) + + + +ui->wasm + + +worker + + + +gateway + +Envoy Gateway +port 8080 + + + +ui->gateway + + +HTTP / SSE + + + +chunker + +chunker UI +(standalone test util) + + + +chunker->gateway + + +HTTP + + + +ui_pod + +detection-ui pod +(Vite :5175) + + + +gateway->ui_pod + + +/  /detection/* + + + +api + +FastAPI +:8702 /detect/* + + + +gateway->api + + +/api/* +/api/detect/stream/* + + + +pg + + +PostgreSQL +jobs · profiles +checkpoints + + + +api->pg + + + + + +redis + + +Redis +(SSE fan-out) + + + +api->redis + + +publish events + + + +minio + + +MinIO +media · overlays + + + +api->minio + + +frames · overlays + + + +gpu + +inference server +YOLO · OCR · VLM +edge · field segmentation + + + +api->gpu + + +HTTP +INFERENCE_URL + + + +cloud + +Anthropic · Gemini +OpenAI · Groq + + + +api->cloud + + +VLM escalation + + + +redis->api + + +SSE consumer + + + diff --git a/docs/architecture/01a-local-architecture.dot b/docs/architecture/01a-local-architecture.dot deleted file mode 100644 index f5ab951..0000000 --- a/docs/architecture/01a-local-architecture.dot +++ /dev/null @@ -1,94 +0,0 @@ -digraph local_architecture { - rankdir=TB - node [shape=box, style=rounded, fontname="Helvetica"] - edge [fontname="Helvetica", fontsize=10] - - labelloc="t" - label="MPR - Local Architecture (Celery + MinIO)" - fontsize=16 - fontname="Helvetica-Bold" - - graph [splines=ortho, nodesep=0.8, ranksep=0.8] - - // External - subgraph cluster_external { - label="External" - style=dashed - color=gray - - browser [label="Browser\nmpr.local.ar", shape=ellipse] - } - - // Nginx reverse proxy - subgraph cluster_proxy { - label="Reverse Proxy" - style=filled - fillcolor="#e8f4f8" - - nginx [label="nginx\nport 80"] - } - - // Application layer - subgraph cluster_apps { - label="Application Layer" - style=filled - fillcolor="#f0f8e8" - - django [label="Django Admin\n/admin\nport 8701"] - fastapi [label="GraphQL API\n/graphql\nport 8702"] - timeline [label="Timeline UI\n/\nport 5173"] - } - - // Worker layer - subgraph cluster_workers { - label="Worker Layer" - style=filled - fillcolor="#fff8e8" - - grpc_server [label="gRPC Server\nport 50051"] - celery [label="Celery Worker\nFFmpeg transcoding"] - } - - // Data layer - subgraph cluster_data { - label="Data Layer" - style=filled - fillcolor="#f8e8f0" - - postgres [label="PostgreSQL\nport 5436", shape=cylinder] - redis [label="Redis\nCelery queue\nport 6381", shape=cylinder] - } - - // Storage - subgraph cluster_storage { - label="S3 Storage (MinIO)" - style=filled - fillcolor="#f0f0f0" - - minio [label="MinIO\nS3-compatible API\nport 9000", shape=folder] - bucket_in [label="mpr-media-in/\ninput videos", shape=note] - bucket_out [label="mpr-media-out/\ntranscoded output", shape=note] - } - - // Connections - browser -> nginx [label="HTTP"] - - nginx -> django [xlabel="/admin"] - nginx -> fastapi [xlabel="/graphql"] - nginx -> timeline [xlabel="/"] - nginx -> minio [xlabel="/media/*"] - - timeline -> fastapi [label="GraphQL"] - django -> postgres - - fastapi -> postgres [label="read/write jobs"] - fastapi -> grpc_server [label="gRPC\nprogress updates"] - - grpc_server -> celery [label="dispatch tasks"] - celery -> redis [label="task queue"] - celery -> postgres [label="update job status"] - celery -> minio [label="S3 API\ndownload input\nupload output"] - - minio -> bucket_in [style=dotted, arrowhead=none] - minio -> bucket_out [style=dotted, arrowhead=none] -} diff --git a/docs/architecture/01a-local-architecture.svg b/docs/architecture/01a-local-architecture.svg deleted file mode 100644 index d0bec4f..0000000 --- a/docs/architecture/01a-local-architecture.svg +++ /dev/null @@ -1,242 +0,0 @@ - - - - - - -local_architecture - -MPR - Local Architecture (Celery + MinIO) - -cluster_external - -External - - -cluster_proxy - -Reverse Proxy - - -cluster_apps - -Application Layer - - -cluster_workers - -Worker Layer - - -cluster_data - -Data Layer - - -cluster_storage - -S3 Storage (MinIO) - - - -browser - -Browser -mpr.local.ar - - - -nginx - -nginx -port 80 - - - -browser->nginx - - -HTTP - - - -django - -Django Admin -/admin -port 8701 - - - -nginx->django - - -/admin - - - -fastapi - -GraphQL API -/graphql -port 8702 - - - -nginx->fastapi - - -/graphql - - - -timeline - -Timeline UI -/ -port 5173 - - - -nginx->timeline - - -/ - - - -minio - -MinIO -S3-compatible API -port 9000 - - - -nginx->minio - - -/media/* - - - -postgres - - -PostgreSQL -port 5436 - - - -django->postgres - - - - - -grpc_server - -gRPC Server -port 50051 - - - -fastapi->grpc_server - - -gRPC -progress updates - - - -fastapi->postgres - - -read/write jobs - - - -timeline->fastapi - - -GraphQL - - - -celery - -Celery Worker -FFmpeg transcoding - - - -grpc_server->celery - - -dispatch tasks - - - -celery->postgres - - -update job status - - - -redis - - -Redis -Celery queue -port 6381 - - - -celery->redis - - -task queue - - - -celery->minio - - -S3 API -download input -upload output - - - -bucket_in - - - -mpr-media-in/ -input videos - - - -minio->bucket_in - - - - -bucket_out - - - -mpr-media-out/ -transcoded output - - - -minio->bucket_out - - - - diff --git a/docs/architecture/01b-aws-architecture.dot b/docs/architecture/01b-aws-architecture.dot deleted file mode 100644 index 397856b..0000000 --- a/docs/architecture/01b-aws-architecture.dot +++ /dev/null @@ -1,85 +0,0 @@ -digraph aws_architecture { - rankdir=TB - node [shape=box, style=rounded, fontname="Helvetica"] - edge [fontname="Helvetica", fontsize=10] - - labelloc="t" - label="MPR - AWS Architecture (Lambda + Step Functions)" - fontsize=16 - fontname="Helvetica-Bold" - - graph [splines=ortho, nodesep=0.8, ranksep=0.8] - - // External - subgraph cluster_external { - label="External" - style=dashed - color=gray - - browser [label="Browser\nmpr.mcrn.ar", shape=ellipse] - } - - // Nginx reverse proxy - subgraph cluster_proxy { - label="Reverse Proxy" - style=filled - fillcolor="#e8f4f8" - - nginx [label="nginx\nport 80"] - } - - // Application layer - subgraph cluster_apps { - label="Application Layer" - style=filled - fillcolor="#f0f8e8" - - django [label="Django Admin\n/admin\nport 8701"] - fastapi [label="GraphQL API\n/graphql\nport 8702"] - timeline [label="Timeline UI\n/\nport 5173"] - } - - // Data layer (still local) - subgraph cluster_data { - label="Data Layer" - style=filled - fillcolor="#f8e8f0" - - postgres [label="PostgreSQL\nport 5436", shape=cylinder] - } - - // AWS layer - subgraph cluster_aws { - label="AWS Cloud" - style=filled - fillcolor="#fde8d0" - - step_functions [label="Step Functions\nOrchestration\nstate machine"] - lambda [label="Lambda Function\nFFmpeg container\ntranscoding"] - s3 [label="S3 Buckets", shape=folder] - bucket_in [label="mpr-media-in/\ninput videos", shape=note] - bucket_out [label="mpr-media-out/\ntranscoded output", shape=note] - } - - // Connections - browser -> nginx [label="HTTP"] - - nginx -> django [xlabel="/admin"] - nginx -> fastapi [xlabel="/graphql"] - nginx -> timeline [xlabel="/"] - - timeline -> fastapi [label="GraphQL"] - django -> postgres - - fastapi -> postgres [label="read/write jobs"] - fastapi -> step_functions [label="boto3\nstart_execution()\nexecution_arn"] - - step_functions -> lambda [label="invoke with\njob parameters"] - lambda -> s3 [label="download input\nupload output"] - lambda -> fastapi [label="POST /jobs/{id}/callback\nupdate status"] - - fastapi -> postgres [label="callback updates\njob status"] - - s3 -> bucket_in [style=dotted, arrowhead=none] - s3 -> bucket_out [style=dotted, arrowhead=none] -} diff --git a/docs/architecture/01b-aws-architecture.svg b/docs/architecture/01b-aws-architecture.svg deleted file mode 100644 index 8274427..0000000 --- a/docs/architecture/01b-aws-architecture.svg +++ /dev/null @@ -1,224 +0,0 @@ - - - - - - -aws_architecture - -MPR - AWS Architecture (Lambda + Step Functions) - -cluster_external - -External - - -cluster_proxy - -Reverse Proxy - - -cluster_apps - -Application Layer - - -cluster_data - -Data Layer - - -cluster_aws - -AWS Cloud - - - -browser - -Browser -mpr.mcrn.ar - - - -nginx - -nginx -port 80 - - - -browser->nginx - - -HTTP - - - -django - -Django Admin -/admin -port 8701 - - - -nginx->django - - -/admin - - - -fastapi - -GraphQL API -/graphql -port 8702 - - - -nginx->fastapi - - -/graphql - - - -timeline - -Timeline UI -/ -port 5173 - - - -nginx->timeline - - -/ - - - -postgres - - -PostgreSQL -port 5436 - - - -django->postgres - - - - - -fastapi->postgres - - -read/write jobs - - - -fastapi->postgres - - -callback updates -job status - - - -step_functions - -Step Functions -Orchestration -state machine - - - -fastapi->step_functions - - -boto3 -start_execution() -execution_arn - - - -timeline->fastapi - - -GraphQL - - - -lambda - -Lambda Function -FFmpeg container -transcoding - - - -step_functions->lambda - - -invoke with -job parameters - - - -lambda->fastapi - - -POST /jobs/{id}/callback -update status - - - -s3 - -S3 Buckets - - - -lambda->s3 - - -download input -upload output - - - -bucket_in - - - -mpr-media-in/ -input videos - - - -s3->bucket_in - - - - -bucket_out - - - -mpr-media-out/ -transcoded output - - - -s3->bucket_out - - - - diff --git a/docs/architecture/01c-gcp-architecture.dot b/docs/architecture/01c-gcp-architecture.dot deleted file mode 100644 index 8fc866c..0000000 --- a/docs/architecture/01c-gcp-architecture.dot +++ /dev/null @@ -1,83 +0,0 @@ -digraph gcp_architecture { - rankdir=TB - node [shape=box, style=rounded, fontname="Helvetica"] - edge [fontname="Helvetica", fontsize=10] - - labelloc="t" - label="MPR - GCP Architecture (Cloud Run Jobs + GCS)" - fontsize=16 - fontname="Helvetica-Bold" - - graph [splines=ortho, nodesep=0.8, ranksep=0.8] - - // External - subgraph cluster_external { - label="External" - style=dashed - color=gray - - browser [label="Browser\nmpr.mcrn.ar", shape=ellipse] - } - - // Nginx reverse proxy - subgraph cluster_proxy { - label="Reverse Proxy" - style=filled - fillcolor="#e8f4f8" - - nginx [label="nginx\nport 80"] - } - - // Application layer - subgraph cluster_apps { - label="Application Layer" - style=filled - fillcolor="#f0f8e8" - - django [label="Django Admin\n/admin\nport 8701"] - fastapi [label="GraphQL API\n/graphql\nport 8702"] - timeline [label="Timeline UI\n/\nport 5173"] - } - - // Data layer (still local) - subgraph cluster_data { - label="Data Layer" - style=filled - fillcolor="#f8e8f0" - - postgres [label="PostgreSQL\nport 5436", shape=cylinder] - } - - // GCP layer - subgraph cluster_gcp { - label="Google Cloud" - style=filled - fillcolor="#e8f0fd" - - cloud_run_job [label="Cloud Run Job\nFFmpeg container\ntranscoding"] - gcs [label="GCS Buckets\n(S3-compat API)", shape=folder] - bucket_in [label="mpr-media-in/\ninput videos", shape=note] - bucket_out [label="mpr-media-out/\ntranscoded output", shape=note] - } - - // Connections - browser -> nginx [label="HTTP"] - - nginx -> django [xlabel="/admin"] - nginx -> fastapi [xlabel="/graphql"] - nginx -> timeline [xlabel="/"] - - timeline -> fastapi [label="GraphQL"] - django -> postgres - - fastapi -> postgres [label="read/write jobs"] - fastapi -> cloud_run_job [label="google-cloud-run\nrun_job() + payload\nexecution_name"] - - cloud_run_job -> gcs [label="S3 compat (HMAC)\ndownload input\nupload output"] - cloud_run_job -> fastapi [label="POST /jobs/{id}/callback\nupdate status"] - - fastapi -> postgres [label="callback updates\njob status"] - - gcs -> bucket_in [style=dotted, arrowhead=none] - gcs -> bucket_out [style=dotted, arrowhead=none] -} diff --git a/docs/architecture/01c-gcp-architecture.svg b/docs/architecture/01c-gcp-architecture.svg deleted file mode 100644 index 9f24e4d..0000000 --- a/docs/architecture/01c-gcp-architecture.svg +++ /dev/null @@ -1,210 +0,0 @@ - - - - - - -gcp_architecture - -MPR - GCP Architecture (Cloud Run Jobs + GCS) - -cluster_external - -External - - -cluster_proxy - -Reverse Proxy - - -cluster_apps - -Application Layer - - -cluster_data - -Data Layer - - -cluster_gcp - -Google Cloud - - - -browser - -Browser -mpr.mcrn.ar - - - -nginx - -nginx -port 80 - - - -browser->nginx - - -HTTP - - - -django - -Django Admin -/admin -port 8701 - - - -nginx->django - - -/admin - - - -fastapi - -GraphQL API -/graphql -port 8702 - - - -nginx->fastapi - - -/graphql - - - -timeline - -Timeline UI -/ -port 5173 - - - -nginx->timeline - - -/ - - - -postgres - - -PostgreSQL -port 5436 - - - -django->postgres - - - - - -fastapi->postgres - - -read/write jobs - - - -fastapi->postgres - - -callback updates -job status - - - -cloud_run_job - -Cloud Run Job -FFmpeg container -transcoding - - - -fastapi->cloud_run_job - - -google-cloud-run -run_job() + payload -execution_name - - - -timeline->fastapi - - -GraphQL - - - -cloud_run_job->fastapi - - -POST /jobs/{id}/callback -update status - - - -gcs - -GCS Buckets -(S3-compat API) - - - -cloud_run_job->gcs - - -S3 compat (HMAC) -download input -upload output - - - -bucket_in - - - -mpr-media-in/ -input videos - - - -gcs->bucket_in - - - - -bucket_out - - - -mpr-media-out/ -transcoded output - - - -gcs->bucket_out - - - - diff --git a/docs/architecture/02-data-model.dot b/docs/architecture/02-data-model.dot index 37fefe8..71f3da3 100644 --- a/docs/architecture/02-data-model.dot +++ b/docs/architecture/02-data-model.dot @@ -1,22 +1,99 @@ digraph data_model { rankdir=LR - node [shape=record, fontname="Helvetica", fontsize=11] - edge [fontname="Helvetica", fontsize=10] + bgcolor="#0a0e17" + fontname="Helvetica" + node [fontname="Helvetica" fontsize=11 shape=plaintext] + edge [fontname="Helvetica" fontsize=9 fontcolor="#8892a8" color="#4a5568"] - labelloc="t" - label="MPR - Data Model" + label="Data Model" + labelloc=t fontsize=16 - fontname="Helvetica-Bold" + fontcolor="#0066ff" - graph [splines=ortho, nodesep=0.6, ranksep=1.2] + MediaAsset [label=< + + + + + + +
MediaAsset
idUUID PK
filenamestr
file_pathstr (relative)
duration / fps / sizeprobe metadata
+ >] - MediaAsset [label="{MediaAsset|id: UUID (PK)\lfilename: str\lfile_path: str (S3 key)\lfile_size: int?\lstatus: pending/ready/error\lerror_message: str?\l|duration: float?\lvideo_codec: str?\laudio_codec: str?\lwidth: int?\lheight: int?\lframerate: float?\lbitrate: int?\lproperties: JSON\l|comments: str\ltags: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] + Profile [label=< + + + + + +
Profile
namestr
pipelineJSONB topology
configsJSONB per-stage
+ >] - TranscodePreset [label="{TranscodePreset|id: UUID (PK)\lname: str (unique)\ldescription: str\lis_builtin: bool\l|container: str\l|video_codec: str\lvideo_bitrate: str?\lvideo_crf: int?\lvideo_preset: str?\lresolution: str?\lframerate: float?\l|audio_codec: str\laudio_bitrate: str?\laudio_channels: int?\laudio_samplerate: int?\l|extra_args: JSON[]\l|created_at: datetime\lupdated_at: datetime\l}"] + Timeline [label=< + + + + + + + +
Timeline
idUUID PK
source_asset_idFK MediaAsset
chunk_pathsstr[]
profile_namestr
fps / statuscached, ready, ...
+ >] - TranscodeJob [label="{TranscodeJob|id: UUID (PK)\l|source_asset_id: UUID (FK)\l|preset_id: UUID? (FK)\lpreset_snapshot: JSON\l|trim_start: float?\ltrim_end: float?\l|output_filename: str\loutput_path: str? (S3 key)\loutput_asset_id: UUID? (FK)\l|status: pending/processing/...\lprogress: float (0-100)\lcurrent_frame: int?\lcurrent_time: float?\lspeed: str?\lerror_message: str?\l|celery_task_id: str?\lexecution_arn: str?\lpriority: int\l|created_at: datetime\lstarted_at: datetime?\lcompleted_at: datetime?\l}"] + Job [label=< + + + + + + + + + +
Job
idUUID PK
timeline_idFK Timeline
parent_idFK Job (replay tree)
profile_namestr
config_overridesJSONB
run_typeinitial / replay / retry
status / current_stageruntime
+ >] - MediaAsset -> TranscodeJob [xlabel="1:N source_asset"] - TranscodePreset -> TranscodeJob [xlabel="1:N preset"] - TranscodeJob -> MediaAsset [xlabel="1:1 output_asset", style=dashed] + Checkpoint [label=< + + + + + + + + +
Checkpoint
idUUID PK
timeline_idFK Timeline
job_idFK Job (nullable)
parent_idFK Checkpoint (tree)
stage_namestr
config_overrides / statsJSONB (no blobs)
+ >] + + StageOutput [label=< + + + + + + + + +
StageOutput
idUUID PK
job_idFK Job
timeline_idFK Timeline
stage_namestr
checkpoint_idFK Checkpoint (nullable)
outputJSONB (flat upsert)
+ >] + + Brand [label=< + + + + + + +
Brand
canonical_namestr (indexed)
aliasesstr[]
sourceocr / local_vlm / cloud_llm / manual
airingsJSONB[]
+ >] + + MediaAsset -> Timeline [label="source_asset_id"] + Timeline -> Job [label="timeline_id"] + Job -> Job [label="parent_id\n(replay tree)" style=dashed] + Profile -> Job [label="profile_name" color="#0066ff"] + Job -> Checkpoint [label="job_id"] + Timeline -> Checkpoint [label="timeline_id"] + Checkpoint -> Checkpoint [label="parent_id\n(tree)" style=dashed] + Job -> StageOutput [label="job_id"] + Checkpoint -> StageOutput [label="checkpoint_id" style=dotted] } diff --git a/docs/architecture/02-data-model.svg b/docs/architecture/02-data-model.svg index 8eae3a9..a130a66 100644 --- a/docs/architecture/02-data-model.svg +++ b/docs/architecture/02-data-model.svg @@ -4,125 +4,272 @@ - - + + data_model - -MPR - Data Model + +Data Model MediaAsset - -MediaAsset - -id: UUID (PK) -filename: str -file_path: str (S3 key) -file_size: int? -status: pending/ready/error -error_message: str? - -duration: float? -video_codec: str? -audio_codec: str? -width: int? -height: int? -framerate: float? -bitrate: int? -properties: JSON - -comments: str -tags: JSON[] - -created_at: datetime -updated_at: datetime + + + +MediaAsset + +id + +UUID PK + +filename + +str + +file_path + +str (relative) + +duration / fps / size + +probe metadata - + -TranscodeJob - -TranscodeJob - -id: UUID (PK) - -source_asset_id: UUID (FK) - -preset_id: UUID? (FK) -preset_snapshot: JSON - -trim_start: float? -trim_end: float? - -output_filename: str -output_path: str? (S3 key) -output_asset_id: UUID? (FK) - -status: pending/processing/... -progress: float (0-100) -current_frame: int? -current_time: float? -speed: str? -error_message: str? - -celery_task_id: str? -execution_arn: str? -priority: int - -created_at: datetime -started_at: datetime? -completed_at: datetime? +Timeline + + + +Timeline + +id + +UUID PK + +source_asset_id + +FK MediaAsset + +chunk_paths + +str[] + +profile_name + +str + +fps / status + +cached, ready, ... - + -MediaAsset->TranscodeJob - - -1:N source_asset +MediaAsset->Timeline + + +source_asset_id - + -TranscodePreset - -TranscodePreset - -id: UUID (PK) -name: str (unique) -description: str -is_builtin: bool - -container: str - -video_codec: str -video_bitrate: str? -video_crf: int? -video_preset: str? -resolution: str? -framerate: float? - -audio_codec: str -audio_bitrate: str? -audio_channels: int? -audio_samplerate: int? - -extra_args: JSON[] - -created_at: datetime -updated_at: datetime +Profile + + + +Profile + +name + +str + +pipeline + +JSONB topology + +configs + +JSONB per-stage - + + +Job + + + +Job + +id + +UUID PK + +timeline_id + +FK Timeline + +parent_id + +FK Job (replay tree) + +profile_name + +str + +config_overrides + +JSONB + +run_type + +initial / replay / retry + +status / current_stage + +runtime + + + +Profile->Job + + +profile_name + + -TranscodePreset->TranscodeJob - - -1:N preset +Timeline->Job + + +timeline_id - + + +Checkpoint + + + +Checkpoint + +id + +UUID PK + +timeline_id + +FK Timeline + +job_id + +FK Job (nullable) + +parent_id + +FK Checkpoint (tree) + +stage_name + +str + +config_overrides / stats + +JSONB (no blobs) + + + +Timeline->Checkpoint + + +timeline_id + + -TranscodeJob->MediaAsset - - -1:1 output_asset +Job->Job + + +parent_id +(replay tree) + + + +Job->Checkpoint + + +job_id + + + +StageOutput + + + +StageOutput + +id + +UUID PK + +job_id + +FK Job + +timeline_id + +FK Timeline + +stage_name + +str + +checkpoint_id + +FK Checkpoint (nullable) + +output + +JSONB (flat upsert) + + + +Job->StageOutput + + +job_id + + + +Checkpoint->Checkpoint + + +parent_id +(tree) + + + +Checkpoint->StageOutput + + +checkpoint_id + + + +Brand + + + +Brand + +canonical_name + +str (indexed) + +aliases + +str[] + +source + +ocr / local_vlm / cloud_llm / manual + +airings + +JSONB[] diff --git a/docs/architecture/03-detection-pipeline.dot b/docs/architecture/03-detection-pipeline.dot new file mode 100644 index 0000000..2a284e8 --- /dev/null +++ b/docs/architecture/03-detection-pipeline.dot @@ -0,0 +1,42 @@ +digraph detection_pipeline { + rankdir=TB + bgcolor="#0a0e17" + fontname="Helvetica" + node [fontname="Helvetica" fontsize=11 style=filled color="#1e2a4a" fontcolor="#e8eaf0" shape=box] + edge [fontname="Helvetica" fontsize=9 fontcolor="#8892a8" color="#4a5568"] + + label="Detection Pipeline (core/detect/graph/nodes.py)" + labelloc=t + fontsize=16 + fontcolor="#0066ff" + + extract_frames [label="extract_frames\n(ffmpeg, fps from profile)" fillcolor="#121829"] + filter_scenes [label="filter_scenes\n(scene-change filter)" fillcolor="#121829"] + + field_seg [label="field_segmentation\n(HSV mask · GPU/WASM)" fillcolor="#0d1a33" fontcolor="#0066ff"] + detect_edges [label="detect_edges\n(Canny + Hough · GPU/WASM)" fillcolor="#0d1a33" fontcolor="#0066ff"] + + detect_objects [label="detect_objects\n(YOLO · GPU)" fillcolor="#1a3a1a" fontcolor="#00c853"] + preprocess [label="preprocess\n(crop · contrast · deskew)" fillcolor="#121829"] + run_ocr [label="run_ocr\n(OCR · GPU)" fillcolor="#1a3a1a" fontcolor="#00c853"] + + match_brands [label="match_brands\n(rapidfuzz vs session)" fillcolor="#121829"] + escalate_vlm [label="escalate_vlm\n(local VLM · GPU)" fillcolor="#1a3a1a" fontcolor="#00c853"] + escalate_cloud [label="escalate_cloud\n(Anthropic · Gemini\nOpenAI · Groq)" fillcolor="#243056" shape=octagon] + + compile_report [label="compile_report\n(timeline + brand stats)" fillcolor="#0d1a33" fontcolor="#0066ff"] + + extract_frames -> filter_scenes + filter_scenes -> field_seg + filter_scenes -> detect_objects + field_seg -> detect_edges [label="masks"] + detect_edges -> detect_objects [style=dashed label="region hints"] + detect_objects -> preprocess [label="boxes"] + preprocess -> run_ocr + run_ocr -> match_brands [label="text candidates"] + match_brands -> escalate_vlm [label="unresolved"] + escalate_vlm -> escalate_cloud [label="still unresolved"] + match_brands -> compile_report + escalate_vlm -> compile_report + escalate_cloud -> compile_report +} diff --git a/docs/architecture/03-detection-pipeline.svg b/docs/architecture/03-detection-pipeline.svg new file mode 100644 index 0000000..916f322 --- /dev/null +++ b/docs/architecture/03-detection-pipeline.svg @@ -0,0 +1,176 @@ + + + + + + +detection_pipeline + +Detection Pipeline (core/detect/graph/nodes.py) + + +extract_frames + +extract_frames +(ffmpeg, fps from profile) + + + +filter_scenes + +filter_scenes +(scene-change filter) + + + +extract_frames->filter_scenes + + + + + +field_seg + +field_segmentation +(HSV mask · GPU/WASM) + + + +filter_scenes->field_seg + + + + + +detect_objects + +detect_objects +(YOLO · GPU) + + + +filter_scenes->detect_objects + + + + + +detect_edges + +detect_edges +(Canny + Hough · GPU/WASM) + + + +field_seg->detect_edges + + +masks + + + +detect_edges->detect_objects + + +region hints + + + +preprocess + +preprocess +(crop · contrast · deskew) + + + +detect_objects->preprocess + + +boxes + + + +run_ocr + +run_ocr +(OCR · GPU) + + + +preprocess->run_ocr + + + + + +match_brands + +match_brands +(rapidfuzz vs session) + + + +run_ocr->match_brands + + +text candidates + + + +escalate_vlm + +escalate_vlm +(local VLM · GPU) + + + +match_brands->escalate_vlm + + +unresolved + + + +compile_report + +compile_report +(timeline + brand stats) + + + +match_brands->compile_report + + + + + +escalate_cloud + +escalate_cloud +(Anthropic · Gemini +OpenAI · Groq) + + + +escalate_vlm->escalate_cloud + + +still unresolved + + + +escalate_vlm->compile_report + + + + + +escalate_cloud->compile_report + + + + + diff --git a/docs/architecture/03-job-flow.dot b/docs/architecture/03-job-flow.dot deleted file mode 100644 index 0ae5b6e..0000000 --- a/docs/architecture/03-job-flow.dot +++ /dev/null @@ -1,104 +0,0 @@ -digraph job_flow { - rankdir=TB - node [shape=box, style=rounded, fontname="Helvetica"] - edge [fontname="Helvetica", fontsize=10] - - labelloc="t" - label="MPR - Job Flow" - fontsize=16 - fontname="Helvetica-Bold" - - graph [splines=ortho, nodesep=0.6, ranksep=0.6] - - // API entry points - subgraph cluster_api { - label="API Entry Points" - style=dashed - color=gray - - rest_create [label="POST /api/jobs/", shape=ellipse] - gql_create [label="mutation createJob", shape=ellipse] - rest_cancel [label="POST /api/jobs/{id}/cancel", shape=ellipse] - rest_callback [label="POST /api/jobs/{id}/callback", shape=ellipse] - } - - // Job states - subgraph cluster_states { - label="Job States" - style=filled - fillcolor="#f8f8f8" - - pending [label="PENDING", fillcolor="#ffc107", style="filled,rounded"] - processing [label="PROCESSING", fillcolor="#17a2b8", style="filled,rounded", fontcolor=white] - completed [label="COMPLETED", fillcolor="#28a745", style="filled,rounded", fontcolor=white] - failed [label="FAILED", fillcolor="#dc3545", style="filled,rounded", fontcolor=white] - cancelled [label="CANCELLED", fillcolor="#6c757d", style="filled,rounded", fontcolor=white] - } - - // State transitions - pending -> processing [xlabel="worker picks up"] - processing -> completed [xlabel="success"] - processing -> failed [xlabel="error"] - pending -> cancelled [xlabel="user cancels"] - processing -> cancelled [xlabel="user cancels"] - failed -> pending [xlabel="retry"] - - rest_create -> pending - gql_create -> pending - rest_cancel -> cancelled [style=dashed] - - // Executor dispatch - subgraph cluster_dispatch { - label="Executor Dispatch" - style=filled - fillcolor="#fff8e8" - - dispatch [label="MPR_EXECUTOR", shape=diamond] - } - - pending -> dispatch - - // Local path - subgraph cluster_local { - label="Local Mode (Celery)" - style=filled - fillcolor="#e8f4e8" - - celery_task [label="Celery Task\n(transcode queue)"] - s3_download [label="S3 Download\n(MinIO)"] - ffmpeg_local [label="FFmpeg\ntranscode/trim"] - s3_upload [label="S3 Upload\n(MinIO)"] - db_update [label="DB Update\n(update_job_progress)"] - } - - dispatch -> celery_task [xlabel="local"] - celery_task -> s3_download - s3_download -> ffmpeg_local - ffmpeg_local -> s3_upload - s3_upload -> db_update - db_update -> completed [style=dotted] - - // Lambda path - subgraph cluster_lambda { - label="Lambda Mode (AWS)" - style=filled - fillcolor="#fde8d0" - - sfn_start [label="Step Functions\nstart_execution"] - lambda_fn [label="Lambda\nFFmpeg container"] - s3_dl_aws [label="S3 Download\n(AWS)"] - ffmpeg_aws [label="FFmpeg\ntranscode/trim"] - s3_ul_aws [label="S3 Upload\n(AWS)"] - callback [label="HTTP Callback\nPOST /jobs/{id}/callback"] - } - - dispatch -> sfn_start [xlabel="lambda"] - sfn_start -> lambda_fn - lambda_fn -> s3_dl_aws - s3_dl_aws -> ffmpeg_aws - ffmpeg_aws -> s3_ul_aws - s3_ul_aws -> callback - callback -> completed [style=dotted] - - rest_callback -> completed [style=dashed, xlabel="Lambda reports"] -} diff --git a/docs/architecture/03-job-flow.svg b/docs/architecture/03-job-flow.svg deleted file mode 100644 index cb09ad1..0000000 --- a/docs/architecture/03-job-flow.svg +++ /dev/null @@ -1,329 +0,0 @@ - - - - - - -job_flow - -MPR - Job Flow - -cluster_api - -API Entry Points - - -cluster_states - -Job States - - -cluster_dispatch - -Executor Dispatch - - -cluster_local - -Local Mode (Celery) - - -cluster_lambda - -Lambda Mode (AWS) - - - -rest_create - -POST /api/jobs/ - - - -pending - -PENDING - - - -rest_create->pending - - - - - -gql_create - -mutation createJob - - - -gql_create->pending - - - - - -rest_cancel - -POST /api/jobs/{id}/cancel - - - -cancelled - -CANCELLED - - - -rest_cancel->cancelled - - - - - -rest_callback - -POST /api/jobs/{id}/callback - - - -completed - -COMPLETED - - - -rest_callback->completed - - -Lambda reports - - - -processing - -PROCESSING - - - -pending->processing - - -worker picks up - - - -pending->cancelled - - -user cancels - - - -dispatch - -MPR_EXECUTOR - - - -pending->dispatch - - - - - -processing->completed - - -success - - - -failed - -FAILED - - - -processing->failed - - -error - - - -processing->cancelled - - -user cancels - - - -failed->pending - - -retry - - - -celery_task - -Celery Task -(transcode queue) - - - -dispatch->celery_task - - -local - - - -sfn_start - -Step Functions -start_execution - - - -dispatch->sfn_start - - -lambda - - - -s3_download - -S3 Download -(MinIO) - - - -celery_task->s3_download - - - - - -ffmpeg_local - -FFmpeg -transcode/trim - - - -s3_download->ffmpeg_local - - - - - -s3_upload - -S3 Upload -(MinIO) - - - -ffmpeg_local->s3_upload - - - - - -db_update - -DB Update -(update_job_progress) - - - -s3_upload->db_update - - - - - -db_update->completed - - - - - -lambda_fn - -Lambda -FFmpeg container - - - -sfn_start->lambda_fn - - - - - -s3_dl_aws - -S3 Download -(AWS) - - - -lambda_fn->s3_dl_aws - - - - - -ffmpeg_aws - -FFmpeg -transcode/trim - - - -s3_dl_aws->ffmpeg_aws - - - - - -s3_ul_aws - -S3 Upload -(AWS) - - - -ffmpeg_aws->s3_ul_aws - - - - - -callback - -HTTP Callback -POST /jobs/{id}/callback - - - -s3_ul_aws->callback - - - - - -callback->completed - - - - - diff --git a/docs/architecture/04-media-storage.md b/docs/architecture/04-media-storage.md index 43734fe..92f7769 100644 --- a/docs/architecture/04-media-storage.md +++ b/docs/architecture/04-media-storage.md @@ -1,31 +1,24 @@ -# Media Storage Architecture +# Media & Artifact Storage ## Overview -MPR uses **S3-compatible storage** everywhere. Locally via MinIO, in production via AWS S3. The same boto3 code and S3 keys work in both environments - the only difference is the `S3_ENDPOINT_URL` env var. +MPR stores everything on **S3-compatible** object storage. Locally that's MinIO; in any +cloud target (AWS, GCS via HMAC, Cloudflare R2, etc.) it's the provider's S3 API. The +code in `core/storage/` uses boto3 throughout — only the endpoint URL and credentials +change between environments. -## Storage Strategy +## What goes where -### S3 Buckets +| Bucket / prefix | Contents | Producer | Consumer | +|---|---|---|---| +| `mpr-media-in` | Source video files (chunks the user uploaded or device-recorded) | user / chunker UI | `extract_frames` stage, `core/api/detect/sources.py` | +| `mpr-media-out` | Per-job artifacts: extracted frame caches, debug overlays | pipeline stages, `core/api/detect/replay.py` overlays endpoints | UI panels (frame strip, overlay viewer) | -| Bucket | Env Var | Purpose | -|--------|---------|---------| -| `mpr-media-in` | `S3_BUCKET_IN` | Source media files | -| `mpr-media-out` | `S3_BUCKET_OUT` | Transcoded/trimmed output | +Both buckets live behind the same S3 client (`core/storage/`). DB rows store relative +keys (e.g. `chunks/2025-04-15/match-01.mp4`); the bucket is implicit. -### S3 Keys as File Paths -- **Database**: Stores S3 object keys (e.g., `video1.mp4`, `subfolder/video3.mp4`) -- **Local dev**: MinIO serves these via S3 API on port 9000 -- **AWS**: Real S3, same keys, different endpoint +## Local development (MinIO) -### Why S3 Everywhere? -1. **Identical code paths** - no branching between local and cloud -2. **Seamless executor switching** - Celery and Lambda both use boto3 -3. **Cloud-native** - ready for production without refactoring - -## Local Development (MinIO) - -### Configuration ```bash S3_ENDPOINT_URL=http://minio:9000 S3_BUCKET_IN=mpr-media-in @@ -34,137 +27,49 @@ AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin ``` -### How It Works -- MinIO runs as a Docker container (port 9000 API, port 9001 console) -- `minio-init` container creates buckets and sets public read access on startup -- Nginx proxies `/media/in/` and `/media/out/` to MinIO buckets -- Upload files via MinIO Console (http://localhost:9001) or `mc` CLI +In the Tilt setup, MinIO runs as a k8s Deployment with port-forwards for `9000` (S3 API) +and `9001` (web console). A `minio-init` job creates the buckets on first start. + +## Cloud (AWS S3 / GCS / others) -### Upload Files to MinIO ```bash -# Using mc CLI -mc alias set local http://localhost:9000 minioadmin minioadmin -mc cp video.mp4 local/mpr-media-in/ - -# Using aws CLI with endpoint override -aws --endpoint-url http://localhost:9000 s3 cp video.mp4 s3://mpr-media-in/ -``` - -## AWS Production (S3) - -### Configuration -```bash -# No S3_ENDPOINT_URL = uses real AWS S3 -S3_BUCKET_IN=mpr-media-in -S3_BUCKET_OUT=mpr-media-out +# AWS S3 — no endpoint URL needed +S3_BUCKET_IN=... +S3_BUCKET_OUT=... AWS_REGION=us-east-1 -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= -``` +AWS_ACCESS_KEY_ID=... +AWS_SECRET_ACCESS_KEY=... -### Upload Files to S3 -```bash -aws s3 cp video.mp4 s3://mpr-media-in/ -aws s3 sync /local/media/ s3://mpr-media-in/ -``` - -## GCP Production (GCS via S3 compatibility) - -GCS exposes an S3-compatible API. The same `core/storage/s3.py` boto3 code works -with no changes — only the endpoint and credentials differ. - -### GCS HMAC Keys -Generate under **Cloud Storage → Settings → Interoperability** in the GCP console. -These act as `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`. - -### Configuration -```bash +# GCS via HMAC S3_ENDPOINT_URL=https://storage.googleapis.com -S3_BUCKET_IN=mpr-media-in -S3_BUCKET_OUT=mpr-media-out -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= - -# Executor -MPR_EXECUTOR=gcp -GCP_PROJECT_ID=my-project -GCP_REGION=us-central1 -CLOUD_RUN_JOB=mpr-transcode -CALLBACK_URL=https://mpr.mcrn.ar/api -CALLBACK_API_KEY= +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= ``` -### Upload Files to GCS -```bash -gcloud storage cp video.mp4 gs://mpr-media-in/ +## Database vs. object storage -# Or with the aws CLI via compat endpoint -aws --endpoint-url https://storage.googleapis.com s3 cp video.mp4 s3://mpr-media-in/ -``` +Heavy artifacts (frames, masks, overlays) live in MinIO/S3. The `Checkpoint` and +`StageOutput` tables in Postgres (see `02-data-model.svg`) hold structured outputs +(detections, stats, references to S3 keys) — never blobs. Frame caches keyed by +`timeline_id` are written by the first run of `extract_frames` and reused by every +later replay on the same timeline. -### Cloud Run Job Handler -`core/task/gcp_handler.py` is the Cloud Run Job entrypoint. It reads the job payload -from `MPR_JOB_PAYLOAD` (injected by `GCPExecutor`), uses `core/storage` for all -GCS access (S3 compat), and POSTs the completion callback to the API. +## Storage module -Set the Cloud Run Job command to: `python -m core.task.gcp_handler` - -## Storage Module - -`core/storage/` package provides all S3 operations: +`core/storage/` exposes the small set of helpers callers need: ```python from core.storage import ( - get_s3_client, # boto3 client (MinIO or AWS) - list_objects, # List bucket contents, filter by extension - download_file, # Download S3 object to local path - download_to_temp, # Download to temp file (caller cleans up) - upload_file, # Upload local file to S3 - get_presigned_url, # Generate presigned URL - BUCKET_IN, # Input bucket name - BUCKET_OUT, # Output bucket name + get_s3_client, + list_objects, + download_file, + download_to_temp, + upload_file, + get_presigned_url, + BUCKET_IN, + BUCKET_OUT, ) ``` -## API Endpoints - -### Scan Media (REST) -```http -POST /api/assets/scan -``` -Lists objects in `S3_BUCKET_IN`, registers new media files. - -### Scan Media (GraphQL) -```graphql -mutation { scanMediaFolder { found registered skipped files } } -``` - -## Job Flow with S3 - -### Local Mode (Celery) -1. Celery task receives `source_key` and `output_key` -2. Downloads source from `S3_BUCKET_IN` to temp file -3. Runs FFmpeg locally -4. Uploads result to `S3_BUCKET_OUT` -5. Cleans up temp files - -### Lambda Mode (AWS) -1. Step Functions invokes Lambda with S3 keys -2. Lambda downloads source from `S3_BUCKET_IN` to `/tmp` -3. Runs FFmpeg in container -4. Uploads result to `S3_BUCKET_OUT` -5. Calls back to API with result - -### Cloud Run Job Mode (GCP) -1. `GCPExecutor` triggers Cloud Run Job with payload in `MPR_JOB_PAYLOAD` -2. `core/task/gcp_handler.py` downloads source from `S3_BUCKET_IN` (GCS S3 compat) -3. Runs FFmpeg in container -4. Uploads result to `S3_BUCKET_OUT` (GCS S3 compat) -5. Calls back to API with result - -All three paths use the same S3-compatible bucket names and key structure. - -## Supported File Types - -**Video:** `.mp4`, `.mkv`, `.avi`, `.mov`, `.webm`, `.flv`, `.wmv`, `.m4v` -**Audio:** `.mp3`, `.wav`, `.flac`, `.aac`, `.ogg`, `.m4a` +Anything else (multipart, lifecycle, versioning) is the bucket's responsibility, not +the application's. diff --git a/docs/architecture/05-chunker-pipeline.md b/docs/architecture/05-chunker-pipeline.md deleted file mode 100644 index 04e9e46..0000000 --- a/docs/architecture/05-chunker-pipeline.md +++ /dev/null @@ -1,290 +0,0 @@ -# Chunker Pipeline — Execution Path - -## Overview - -The chunker pipeline splits a media file into time-based segments using FFmpeg stream-copy. Events flow from worker threads through Redis and gRPC-Web streaming to the browser UI in real time. - -**7 hops from worker thread to pixel:** - -``` -Worker thread → Pipeline._emit() → event_bridge() → Redis RPUSH - → [50ms poll] gRPC server LRANGE → yield protobuf - → HTTP/2 frame → Envoy (grpc-web filter) - → HTTP/1.1 chunk → nginx (proxy_buffering off) - → fetch ReadableStream → protobuf-ts decode - → setEvents([...prev, evt]) → React re-render -``` - ---- - -## Step 1: Job Creation (Browser → GraphQL → Celery) - -``` -User clicks "Start" - → App.tsx: handleStart(config) - → api.ts: createChunkJob(config) - → POST /graphql (nginx :80 → fastapi:8702) - → graphql.py: Mutation.create_chunk_job() - → core.db: creates ChunkJob row in Postgres - → Celery: run_job.delay(job_type="chunk", job_id=..., payload=...) - → Returns { id, celery_task_id } to browser - → App.tsx: setJobId(id) — triggers gRPC stream subscription -``` - -**Files:** `ui/chunker/src/api.ts`, `core/api/graphql.py`, `core/jobs/task.py` - ---- - -## Step 2: gRPC-Web Stream (Browser → nginx → Envoy → gRPC Server) - -Once `jobId` is set, `useGrpcStream(jobId)` opens a server-streaming RPC: - -``` -useGrpcStream(jobId) fires useEffect - → GrpcWebFetchTransport({ baseUrl: "/grpc-web" }) - → WorkerServiceClient.streamChunkPipeline({ jobId }) - → fetch() POST to /grpc-web/worker.WorkerService/StreamChunkPipeline - → nginx :80 /grpc-web/ (proxy_pass → envoy:8090, proxy_buffering off) - → Envoy :8090 (grpc_web filter: HTTP/1.1 grpc-web → HTTP/2 native gRPC) - → gRPC server :50051 WorkerServicer.StreamChunkPipeline() - → Enters Redis polling loop (Step 5) -``` - -**Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ctrl/nginx.conf`, `ctrl/envoy.yaml`, `core/rpc/server.py` - -**Key nginx config:** `proxy_buffering off` is critical — without it, nginx collects the entire upstream response before forwarding, defeating streaming entirely. - ---- - -## Step 3: Celery Worker → ChunkHandler - -``` -Celery picks up run_job task - → task.py: run_job(job_type="chunk", job_id, payload) - → registry.get_handler("chunk") → ChunkHandler - → chunk.py: ChunkHandler.process(job_id, payload) - → download_to_temp(BUCKET_IN, source_key) — pulls source from MinIO/S3 - → Creates output_dir: /app/media/out/chunks/{job_id}/ - → Constructs event_bridge callback (bridges Pipeline events → Redis) - → pipeline = Pipeline(source, ..., event_callback=event_bridge, output_dir=...) - → pipeline.run() -``` - -**Files:** `core/jobs/task.py`, `core/jobs/handlers/chunk.py` - -The `event_bridge` closure wraps every `Pipeline._emit()` call, forwarding to `push_event(job_id, event_type, data)` which writes to Redis. - ---- - -## Step 4: Pipeline Orchestration (inside Celery worker process) - -`Pipeline.run()` spawns multiple threads: - -``` -pipeline.run(): - │ - ├─ Chunker(source, chunk_duration) - │ → ffprobe source file → gets duration, file_size - │ → calculates total_chunks = ceil(duration / chunk_duration) - │ - ├─ _emit("pipeline_start", {...}) → event_bridge → Redis - ├─ _emit("pipeline_info", {file_size, duration, total_chunks}) → Redis - │ - ├─ Creates ChunkQueue(maxsize=10) - ├─ Creates WorkerPool(num_workers=N, chunk_queue, processor, event_callback) - │ - ├─ pool.start() — spawns N worker threads - │ - ├─ MONITOR THREAD starts (_monitor_progress) - │ → Every 500ms: _emit("pipeline_progress", {elapsed, throughput_mbps}) → Redis - │ - ├─ PRODUCER THREAD starts (_produce_chunks) - │ → Iterates chunker.chunks() → yields Chunk(sequence, start_time, end_time) - │ → For each: chunk_queue.put(chunk) - │ → _emit("chunk_queued", {sequence, start_time, end_time, queue_size}) → Redis - │ → chunk_queue.close() when done (sends N sentinel Nones) - │ - ├─ WORKER THREADS (N concurrent, each runs worker.py:Worker.run()) - │ │ Each worker loops: - │ │ - │ ├─ chunk = chunk_queue.get(timeout=1.0) - │ ├─ _emit("chunk_processing", {sequence, state:"processing", queue_size}) → Redis - │ │ - │ ├─ processor.process(chunk) - │ │ ├─ ffmpeg: runs `ffmpeg -ss start -to end -c copy chunk_NNNN.mp4` - │ │ ├─ simulated_decode: sleep(random) + checksum - │ │ └─ checksum: reads bytes, computes hash - │ │ - │ ├─ On success: _emit("chunk_done", {sequence, processing_time, retries, queue_size}) → Redis - │ ├─ On failure: retries with exponential backoff (0.1s, 0.2s, 0.4s...) - │ │ └─ _emit("chunk_retry", {sequence, attempt, backoff}) → Redis - │ │ └─ _emit("chunk_error", {sequence, error, retries}) → Redis (after exhaustion) - │ │ - │ └─ On sentinel (None): _emit("worker_status", {state:"stopped"}) → Redis - │ - ├─ pool.wait() — joins all worker threads, collects results - ├─ monitor_stop.set() — stops progress monitor - │ - ├─ ResultCollector — reassembles results in sequence order - │ └─ _emit("chunk_collected", {sequence, buffered, emitted}) → Redis - │ - ├─ Writes manifest.json to output_dir - │ - └─ _emit("pipeline_complete", {total_chunks, processed, failed, elapsed, throughput}) → Redis -``` - -**Files:** `core/chunker/pipeline.py`, `core/chunker/worker.py`, `core/chunker/pool.py`, `core/chunker/chunker.py`, `core/chunker/collector.py` - ---- - -## Step 5: Redis — the Event Bus - -``` -WRITE side (Celery worker, all threads): - push_event(job_id, event_type, data) - → json.dumps({"event": event_type, ...data}) - → Redis RPUSH to key "chunk_events:{job_id}" - → Redis EXPIRE 3600 (1 hour TTL) - -READ side (gRPC server, StreamChunkPipeline): - poll_events(job_id, cursor) - → Redis LRANGE "chunk_events:{job_id}" cursor -1 - → Returns (parsed_events, new_cursor) - → Called every 50ms (time.sleep(0.05) in server loop) -``` - -Redis acts as a decoupling layer between the Celery worker process (which runs the pipeline) and the gRPC server process (which streams to browsers). Events are appended with RPUSH and read with cursor-based LRANGE polling. - -**Files:** `core/events.py` - ---- - -## Step 6: gRPC Server → Envoy → nginx → Browser - -``` -server.py: StreamChunkPipeline polling loop: - while context.is_active(): - events, cursor = poll_events(job_id, cursor) ← Redis LRANGE - for data in events: - yield worker_pb2.ChunkPipelineEvent( ← serialized protobuf message - job_id, event_type, sequence, worker_id, - state, queue_size, elapsed, throughput_mbps, - total_chunks, processed_chunks, failed_chunks, - error, processing_time, retries - ) - if event_type in ("pipeline_complete", "pipeline_error"): - return ← ends the stream - time.sleep(0.05) ← 50ms poll interval - - Each yield sends: - → gRPC HTTP/2 DATA frame to Envoy - → Envoy grpc_web filter: HTTP/2 → base64-encoded grpc-web-text - → nginx proxy_pass (proxy_buffering off) → chunked HTTP/1.1 to browser - → fetch() ReadableStream in GrpcWebFetchTransport - → @protobuf-ts decodes protobuf → ChunkPipelineEvent TypeScript object -``` - -**Files:** `core/rpc/server.py`, `ctrl/envoy.yaml`, `ctrl/nginx.conf`, `ui/common/api/grpc/worker.ts`, `ui/common/api/grpc/worker.client.ts` - ---- - -## Step 7: React State Derivation and Rendering - -``` -useGrpcStream.ts: - for await (const msg of stream.responses): - const evt = toEvent(msg) ← maps protobuf camelCase → snake_case PipelineEvent - setEvents(prev => [...prev, evt]) ← appends to events array - if pipeline_complete/error → setDone(true), break - -App.tsx useMemo(events): - Iterates ALL events on every update, derives: - ├─ chunkMap: Map — state machine per chunk - │ pending → queued → processing → done/error/retry - ├─ workerMap: Map — state per worker - │ idle → processing → idle → ... → stopped - ├─ stats: PipelineStats - │ total_chunks, processed, failed, retries, elapsed, throughput_mbps, queue_size - ├─ errors: ErrorEntry[] — every event containing an error field - └─ queueSize: number — last seen queue_size value - - Renders: - ├─ ChunkGrid — colored cells per chunk (pending/queued/processing/done/error) - ├─ QueueGauge — current queue depth / max - ├─ WorkerPanel — per-worker state + current chunk assignment - ├─ StatsPanel — elapsed time, throughput, processed/failed counts - ├─ ErrorLog — scrollable error list - └─ OutputFiles — download links (when done) -``` - -**Files:** `ui/chunker/src/hooks/useGrpcStream.ts`, `ui/chunker/src/App.tsx` - ---- - -## Step 8: Output File Access (after pipeline completes) - -``` -App.tsx useEffect([done, jobId]): - → api.ts: getChunkOutputFiles(jobId) - → POST /graphql → graphql.py: chunk_output_files(job_id) - → Reads /app/media/out/chunks/{job_id}/ directory listing from disk - → Returns [{key, size, url: "/media/out/chunks/{job_id}/chunk_0001.mp4"}] - → Browser renders download links - → Click link → nginx /media/out/ → alias /app/media/out/ → serves file from disk -``` - -Chunks are written directly to `media/out/chunks/{job_id}/` by the ffmpeg processor — no MinIO upload needed for output. Nginx serves them with `autoindex on`. - -**Files:** `core/api/graphql.py`, `core/jobs/handlers/chunk.py`, `ctrl/nginx.conf` - ---- - -## Event Types Reference - -| Event | Source | Key Fields | -|-------|--------|------------| -| `pipeline_start` | Pipeline.run() | source, chunk_duration, num_workers, processor_type | -| `pipeline_info` | Pipeline.run() | file_size, source_duration, total_chunks | -| `pipeline_progress` | Monitor thread (500ms) | elapsed, throughput_mbps | -| `chunk_queued` | Producer thread | sequence, start_time, end_time, duration, queue_size | -| `chunk_processing` | Worker thread | sequence, worker_id, state, queue_size | -| `chunk_done` | Worker thread | sequence, processing_time, retries, queue_size | -| `chunk_retry` | Worker thread | sequence, attempt, backoff | -| `chunk_error` | Worker thread | sequence, error, retries | -| `chunk_collected` | ResultCollector | sequence, buffered, emitted | -| `worker_status` | Worker thread | worker_id, state (idle/processing/stopped) | -| `pipeline_complete` | Pipeline.run() | total_chunks, processed, failed, elapsed, throughput_mbps | -| `pipeline_error` | Pipeline.run() | error | - ---- - -## Thread Model (inside Celery worker) - -``` -Celery worker process - └─ run_job task thread - └─ Pipeline.run() - ├─ Producer thread — enqueues chunks - ├─ Monitor thread — emits progress every 500ms - ├─ Worker thread 0 — pulls from queue, processes - ├─ Worker thread 1 — pulls from queue, processes - ├─ Worker thread 2 — pulls from queue, processes - └─ Worker thread 3 — pulls from queue, processes -``` - -All threads share the same `event_callback` → `event_bridge` → `push_event()`, which creates a new Redis connection per call. Thread-safe via Redis atomic RPUSH. - ---- - -## Infrastructure - -| Service | Port | Role | -|---------|------|------| -| nginx | 80 | Reverse proxy, static file serving | -| fastapi | 8702 | GraphQL API (Strawberry) | -| celery | — | Task worker (runs pipeline) | -| redis | 6379 | Event bus + Celery broker | -| grpc | 50051 | gRPC server (StreamChunkPipeline) | -| envoy | 8090 | gRPC-Web ↔ native gRPC translation | -| minio | 9000 | S3-compatible source media storage | -| postgres | 5432 | Job/asset metadata | diff --git a/docs/architecture/05-detection-pipeline.md b/docs/architecture/05-detection-pipeline.md new file mode 100644 index 0000000..e67686f --- /dev/null +++ b/docs/architecture/05-detection-pipeline.md @@ -0,0 +1,145 @@ +# Detection Pipeline — Execution Path + +## Overview + +A pipeline run is a sequence of named **stages** that read and write a shared +`DetectState` dict. Stages are defined in `core/detect/stages/`; the orchestrator +(`core/detect/graph/runner.py`) flattens the profile's `PipelineConfig` graph into a +linear order, runs each stage, and emits SSE events to the browser. + +The full stage list is in `core/detect/graph/nodes.py`: + +``` +extract_frames → filter_scenes + → field_segmentation → detect_edges + → detect_objects → preprocess → run_ocr + → match_brands → escalate_vlm → escalate_cloud + → compile_report +``` + +See `03-detection-pipeline.svg` for the graph view. + +## Profile + +A `Profile` row in Postgres holds two JSONB blobs: + +- `pipeline` — a `PipelineConfig` (stages + edges + routing rules) defining topology +- `configs` — `{stage_name: {...}}` per-stage parameters (fps, thresholds, prompts, ...) + +Profiles are the config mechanism: **duplicate a profile and tweak it** instead of +patching defaults. `core/detect/profile.py` loads profiles by name; `_load_profile()` +in `nodes.py` merges the job's `config_overrides` on top. + +## Stage runner + +`PipelineRunner` (in `core/detect/graph/runner.py`) iterates the flattened stages and +between each one checks three control flags (all keyed by `job_id`): + +- **cancel** — `set_cancel_check(job_id, fn)`; raises `PipelineCancelled` to abort +- **pause / resume** — a `threading.Event` per job; `_wait_if_paused()` blocks +- **step** — like resume but auto-pauses after the next stage completes +- **pause-after-stage** — toggle to step through every stage + +Each stage runs inside `trace_node(state, name)` (sets a span used by tracing) and +emits `running` → `done` (or `skipped`) transitions via `core/detect/emit.py`. + +## Inference: GPU-host indirection + +`core/detect/graph/nodes.py` reads `INFERENCE_URL` from the environment and passes it +to every CV/ML stage: + +- `INFERENCE_URL=""` (default in dev) — stages call CV/ML routines in-process +- `INFERENCE_URL=http://gpu-host:8000` — stages POST to the GPU server + (`core/gpu/server.py`) which exposes `/detect`, `/ocr`, `/preprocess`, `/vlm`, + `/detect_edges`, `/segment_field` (each with a `/debug` variant that returns + intermediate masks for the overlay viewer) + +Memory note: dev and GPU machines are separate boxes on the same LAN; inference is a +network call. Heavy ML deps (`torch`, `transformers`, `paddleocr`) live only in +`core/gpu/pyproject.toml` — the API host doesn't need them. + +## Browser-side CV (OpenCV WASM) + +Some stages (notably the field/edge stages) can run in the browser via OpenCV WASM +(`ui/detection-app/src/cv/wasmBridge.ts`) for fast iteration without a round trip to +the GPU host. The browser UI is the test surface for the "replay loop" — change a +config, replay one stage, see the overlay. Browser CV uses OpenCV WASM directly; there +are no TypeScript ports of the algorithms. + +## Cloud VLM escalation + +`escalate_vlm` (local VLM on GPU host) and `escalate_cloud` (Anthropic / Gemini / +OpenAI / Groq via `core/detect/providers/`) are the last-resort branches for +unresolved candidates from `match_brands`. Skip flags: + +- `SKIP_VLM=1` — emits `skipped` for `escalate_vlm` +- `SKIP_CLOUD=1` — emits `skipped` for `escalate_cloud` + +## Checkpoints, StageOutput, and replay + +Two tables back the replay loop: + +- **Checkpoint** (`core/db/models.py:Checkpoint`) — a tree node: + `(parent_id, stage_name, config_overrides, stats)`. No blobs. Lets the UI show a + branching history of "what configs did we try at this stage?" +- **StageOutput** — a flat upsert table keyed by `(job_id, stage_name)` holding the + stage's output dict. `replay-stage` reads upstream outputs from here so a single + stage can be re-run without rerunning the whole pipeline. + +API surface (`core/api/detect/replay.py`): + +- `GET /checkpoints/{timeline_id}` — full tree +- `POST /replay` — clone a checkpoint into a new job, run from a chosen stage +- `POST /replay-stage` — re-run one stage in place using upstream `StageOutput` rows +- `GET /overlays/{timeline_id}/{job_id}/{stage}/{seq}` — debug overlays from MinIO + +## Event flow (SSE) + +Stages call `emit.transition(...)` / `emit.log(...)` / `emit.boxes(...)` etc. +(`core/detect/emit.py`). These push into Redis (`core/detect/events.py`). The SSE +endpoint `GET /detect/stream/{job_id}` (`core/api/detect/sse.py`) drains the Redis +list and writes to the open SSE response. Envoy keeps the connection open for up to +3600s (see `ctrl/k8s/base/envoy.yaml`). + +``` +stage code + → emit.* (core/detect/emit.py) + → push_detect_event → Redis RPUSH + → [poll] /detect/stream/{job_id} → SSE chunk + → fetch ReadableStream in detection-app + → Pinia store update → Vue panel re-render +``` + +## Pipeline control endpoints + +All under `core/api/detect/run.py`: + +- `POST /run` — start a job from a timeline + profile +- `POST /stop/{job_id}` — cancel +- `POST /pause/{job_id}` / `POST /resume/{job_id}` +- `POST /step/{job_id}` — run one stage and pause +- `POST /pause-after-stage/{job_id}` — toggle pause-after-each-stage +- `GET /status/{job_id}` — current stage, progress +- `POST /clear/{job_id}` — discard runtime state + +## Where the chunker UI fits + +`ui/chunker/` is a **standalone testing utility** for the source-chunking step (split +a long source video into chunks the user picks for a Timeline). It is **not** a +pipeline stage and is not part of the detection flow. The detection pipeline reads +already-chunked sources from MinIO via `core/api/detect/sources.py`. + +## Files + +| Concern | File | +|---|---| +| Stage list | `core/detect/graph/nodes.py` | +| Runner (cancel/pause/resume) | `core/detect/graph/runner.py` | +| Profile loading | `core/detect/profile.py` | +| Event emission | `core/detect/emit.py`, `core/detect/events.py` | +| SSE endpoint | `core/api/detect/sse.py` | +| Replay API | `core/api/detect/replay.py` | +| Checkpoint storage | `core/detect/checkpoint/storage.py` | +| GPU server | `core/gpu/server.py` | +| Browser CV bridge | `ui/detection-app/src/cv/wasmBridge.ts` | +| Cloud VLM providers | `core/detect/providers/` | diff --git a/docs/architecture/styles.css b/docs/architecture/styles.css deleted file mode 100644 index b3094f2..0000000 --- a/docs/architecture/styles.css +++ /dev/null @@ -1,209 +0,0 @@ -:root { - --bg-color: #1a1a2e; - --text-color: #e8e8e8; - --accent-color: #4a90d9; - --border-color: #333; - --sidebar-width: 220px; - --sidebar-bg: #151528; -} - -* { - box-sizing: border-box; - margin: 0; - padding: 0; -} - -body { - font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, sans-serif; - background-color: var(--bg-color); - color: var(--text-color); - line-height: 1.6; -} - -/* Sidebar navigation */ -.sidebar { - position: fixed; - top: 0; - left: 0; - width: var(--sidebar-width); - height: 100vh; - background: var(--sidebar-bg); - border-right: 1px solid var(--border-color); - padding: 1.5rem 1rem; - overflow-y: auto; - z-index: 10; -} - -.sidebar h2 { - font-size: 1.2rem; - color: var(--accent-color); - margin-bottom: 1.5rem; - padding-bottom: 0.5rem; - border-bottom: 1px solid var(--border-color); -} - -.sidebar ul { - list-style: none; - display: flex; - flex-direction: column; - gap: 0.25rem; -} - -.sidebar li { - display: block; -} - -.sidebar a { - display: block; - padding: 0.4rem 0.6rem; - color: var(--text-color); - text-decoration: none; - font-size: 0.85rem; - border-radius: 4px; - transition: background 0.15s, color 0.15s; -} - -.sidebar a:hover { - background: rgba(74, 144, 217, 0.15); - color: var(--accent-color); -} - -/* Main content */ -.content { - margin-left: var(--sidebar-width); - padding: 2rem; -} - -h1 { - font-size: 2rem; - margin-bottom: 1rem; - color: var(--accent-color); -} - -.content > h2 { - font-size: 1.5rem; - margin: 2rem 0 1rem; - color: var(--text-color); - border-bottom: 1px solid var(--border-color); - padding-bottom: 0.5rem; - scroll-margin-top: 1rem; -} - -.diagram-container { - display: flex; - flex-wrap: wrap; - gap: 2rem; - margin-top: 1rem; -} - -.diagram { - flex: 1; - min-width: 400px; - background: #252540; - border-radius: 8px; - padding: 1rem; - border: 1px solid var(--border-color); -} - -.diagram h3 { - font-size: 1.1rem; - margin-bottom: 0.5rem; - color: var(--accent-color); -} - -.diagram img, -.diagram object { - width: 100%; - height: auto; - background: white; - border-radius: 4px; -} - -.diagram a { - display: block; - text-align: center; - margin-top: 0.5rem; - color: var(--accent-color); - text-decoration: none; - font-size: 0.9rem; -} - -.diagram a:hover { - text-decoration: underline; -} - -.legend { - margin-top: 2rem; - padding: 1rem; - background: #252540; - border-radius: 8px; - border: 1px solid var(--border-color); -} - -.legend h3 { - margin-bottom: 0.5rem; -} - -.legend ul { - list-style: none; - display: flex; - flex-wrap: wrap; - gap: 1rem; -} - -.legend li { - display: flex; - align-items: center; - gap: 0.5rem; -} - -.legend .color-box { - width: 16px; - height: 16px; - border-radius: 3px; -} - -code { - background: #333; - padding: 0.2rem 0.4rem; - border-radius: 3px; - font-family: 'Monaco', 'Consolas', monospace; - font-size: 0.9em; -} - -pre { - background: #252540; - padding: 1rem; - border-radius: 8px; - overflow-x: auto; - border: 1px solid var(--border-color); -} - -pre code { - background: none; - padding: 0; -} - -/* Responsive: collapse sidebar on small screens */ -@media (max-width: 768px) { - .sidebar { - position: static; - width: 100%; - height: auto; - border-right: none; - border-bottom: 1px solid var(--border-color); - } - - .sidebar ul { - flex-direction: row; - flex-wrap: wrap; - } - - .content { - margin-left: 0; - } - - .diagram { - min-width: 100%; - } -} diff --git a/docs/index.html b/docs/index.html index 413af1a..99e6598 100644 --- a/docs/index.html +++ b/docs/index.html @@ -1,380 +1,564 @@ - + - - - - MPR - Architecture - - - - + + + +MPR — Detection Pipeline Architecture + + + + +
+

MPR

+ Media Processing & Detection Pipeline — Architecture + +
+ +
+ + + + +
+ +
+

OVERVIEW

+

A guided tour of the platform — start here for narrative context before the diagrams.

+
+ +

What MPR is

+

MPR is a brand / logo / text detection pipeline for video. A user picks chunks of source material into a Timeline, then runs a Profile (pipeline topology + per-stage config) against it. The pipeline extracts frames, filters scenes, runs CV (field segmentation, edge detection) and detection (YOLO, OCR), resolves text to a session brand list, and escalates anything still unresolved to a local VLM and then to cloud VLM providers. Output is a brand timeline and per-brand stats.

+ +

Where things run

+

The architecture spans four boxes: the browser (Vue 3 detection-app + OpenCV WASM worker for fast CV iteration), the K8s cluster (Envoy Gateway, FastAPI, detection-ui, Postgres, Redis, MinIO — Kind in dev via Tilt), a separate GPU host on the LAN running the inference server (YOLO, OCR, local VLM), and cloud VLM providers (Anthropic, Gemini, OpenAI, Groq) for last-resort escalation. See System.

+ +

Replay loop

+

The system is built around iteration. Checkpoint rows form a tree of "what configs did we try at this stage" (no blobs); StageOutput is a flat upsert table holding each stage's output dict. A single stage can be re-run in place using upstream StageOutput rows, so the UI loop is "tweak config → replay one stage → look at the overlay" without rerunning the whole pipeline. Frame caches keyed by timeline_id are reused across replays.

+ +

Profiles, not overrides

+

Profiles live in Postgres as two JSONB blobs — pipeline (stages + edges + routing) and configs (per-stage parameters). The convention is to duplicate a profile and tweak it, not to layer overrides at the call site. Job-level config_overrides exist but are merged on top of the resolved profile in core/detect/graph/nodes.py.

+ +

Inference indirection

+

Every CV/ML stage takes an INFERENCE_URL argument. Empty (the dev default) runs CV in-process; set, the stage POSTs to core/gpu/server.py on the GPU host. Heavy ML deps (torch, transformers, paddleocr) live only in core/gpu/pyproject.toml — the API host doesn't need them.

+ +

API and SSE

+

FastAPI under /detect/* (core/api/detect/): sources, run/stop/pause/resume/step, status, replay, checkpoints, overlays, config. Pipeline events fan out through Redis to GET /detect/stream/{job_id} as SSE. Envoy keeps the SSE connection open for up to 3600s.

+ +

Codegen

+

Source-of-truth dataclasses live in core/schema/models/. The standalone modelgen tool emits SQLModel ORM (core/db/models.py), Pydantic schemas, TypeScript types, and Protobuf definitions. Regenerate everything with bash ctrl/generate.sh.

+ +
+
+ +
+

SYSTEM ARCHITECTURE

+

Browser ↔ Envoy Gateway ↔ FastAPI / detection-ui ↔ data plane (Postgres / Redis / MinIO) ↔ LAN GPU host ↔ cloud VLM providers.

+
+ System Architecture +
+
+ Browser + K8s cluster + GPU host (LAN) + Cloud VLM +
+
+ +
+

DETECTION PIPELINE

+

11 named stages from core/detect/graph/nodes.py. The runner flattens the profile's PipelineConfig graph into a linear sequence and runs each stage with cancel / pause / resume / step control.

+
+ Detection Pipeline +
+
+ Browser / WASM-eligible + GPU inference + Cloud VLM +
+
+

Control flow. Each stage runs inside trace_node(), emits runningdone/skipped via core/detect/emit.py, and writes its result to a StageOutput row keyed by (job_id, stage_name). Between stages the runner checks three job-keyed flags: cancel (set_cancel_check), pause/resume (threading.Event), and pause-after-stage / step.

+

Skip flags. SKIP_VLM=1 emits skipped for escalate_vlm; SKIP_CLOUD=1 for escalate_cloud. Useful in CI and dev when you don't want to burn provider credits.

+

Full pipeline reference →

+
+
+ +
+

PROFILES & CHECKPOINTS

+

Profiles are the config mechanism; checkpoints + StageOutput power the replay loop.

+
+ +

Profile shape

+

One Profile row per content type (e.g. soccer_broadcast) holds two JSONB blobs:

+
    +
  • pipeline — a PipelineConfig: stages + edges + routing rules. The runner topologically sorts the edges, falling back to stage order when no edges are defined.
  • +
  • configs{stage_name: {...}} per-stage parameters: fps, thresholds, prompts, etc. Each stage parses its slice into a typed config (FrameExtractionConfig, OCRConfig, ...).
  • +
+

Convention: duplicate a profile and tweak it rather than patching defaults at the call site. Job-level config_overrides exist for one-off experiments but the resolved profile is the durable artifact.

+ +

Checkpoint tree

+

A Checkpoint row is a tree node: (parent_id, stage_name, config_overrides, stats). No blobs. Lets the UI show a branching history of "what configs did we try at this stage" without dragging frame data around.

+ +

StageOutput (flat upsert)

+

One row per (job_id, stage_name) holding the stage's output dict. Single-stage replay reads upstream outputs from here, so re-running match_brands with a tweaked threshold doesn't redo OCR. POST /replay-stage is the entry point.

+ +

Replay loop

+

The detection-app UI is the test surface: change a config, replay one stage, see the overlay rendered from the cached frame plus the new StageOutput. Frame caches keyed by timeline_id survive across replays — extract_frames only fires on the first run for a timeline.

+ +
+
+ +
+

INFERENCE TOPOLOGY

+

Stages can run in three places. The split is what keeps the dev box light and lets one GPU host serve the whole team.

+
+ +

Browser (OpenCV WASM)

+

Field and edge stages can run in a Web Worker via ui/detection-app/src/cv/wasmBridge.ts using OpenCV WASM directly — no TypeScript ports of the algorithms. This is the fast-iteration path for the replay loop: tweak a kernel size, rerun the stage on the cached frames, see the overlay update without touching a server.

+ +

API host (in-process)

+

With INFERENCE_URL="" (the dev default in ctrl/k8s/base/configmap.yaml) every CV/ML stage calls its routine in-process. Useful when there's no GPU host wired up; works for everything except heavy YOLO/VLM workloads.

+ +

GPU host (LAN)

+

Set INFERENCE_URL=http://gpu-host:8000 and the same stages POST to core/gpu/server.py. The GPU server exposes /detect, /ocr, /preprocess, /vlm, /detect_edges, /segment_field — each with a /debug variant that returns intermediate masks for the overlay viewer. Heavy ML deps live only in core/gpu/pyproject.toml; the API host doesn't import torch.

+ +

Cloud VLM providers

+

Last-resort escalation for unresolved candidates. core/detect/providers/ wraps Anthropic, Gemini, OpenAI, and Groq. Selection is per-profile config; SKIP_CLOUD=1 bypasses the stage entirely.

+ +
+
+ +
+

DATA MODEL

+

Tables generated by modelgen from core/schema/models/ into core/db/models.py (SQLModel).

+
+ Data Model +
+
+
    +
  • MediaAsset — source video file with probe metadata (duration, fps, codec).
  • +
  • Profile — pipeline topology + per-stage config (JSONB).
  • +
  • Timeline — user-created selection of chunks from a source asset.
  • +
  • Job — one pipeline run on a timeline; parent_id chains replays into a tree.
  • +
  • Checkpoint — tree node of stage state, no blobs.
  • +
  • StageOutput — flat upsert per (job, stage), holds output JSONB and an optional checkpoint_id.
  • +
  • Brand — canonical name, aliases, source (ocr/local_vlm/cloud_llm/manual), airing history.
  • +
+
+
+ +
+

API

+

FastAPI under /detect/* (mounted from core/api/detect/). Through Envoy Gateway in dev the public path is /api/detect/...; /api/detect/stream/* gets an extended idle timeout for SSE.

+
# Sources / timelines
+GET    /sources
+GET    /sources/{job_id}/chunks
+POST   /timeline
+GET    /timeline
+GET    /timeline/{id}
+DELETE /timeline/{id}/cache
+
+# Run control
+POST   /run
+POST   /stop/{job_id}
+POST   /pause/{job_id}
+POST   /resume/{job_id}
+POST   /step/{job_id}
+POST   /pause-after-stage/{job_id}
+GET    /status/{job_id}
+POST   /clear/{job_id}
+
+# Live events
+GET    /stream/{job_id}              # SSE
+
+# Replay / checkpoints / overlays
+GET    /checkpoints/{timeline_id}
+GET    /checkpoints/{timeline_id}/{stage}
+GET    /scenarios
+POST   /replay
+POST   /replay-stage
+POST   /overlays
+GET    /overlays/{timeline_id}/{job_id}/{stage}/{seq}
+
+# Config
+GET    /config
+PUT    /config
+GET    /config/profiles
+GET    /config/profiles/{name}/pipeline
+PUT    /config/edge-transform
+GET    /config/stages
+GET    /config/stages/{stage_name}
+
+# Jobs
+GET    /jobs
+GET    /jobs/{id}
+
+ +
+

STORAGE

+

S3-compatible everywhere — MinIO locally, real S3 / GCS / R2 in cloud targets. The same boto3 code path serves both; only S3_ENDPOINT_URL and credentials change.

+
+
    +
  • mpr-media-in — source video files (chunks).
  • +
  • mpr-media-out — per-job artifacts: extracted frame caches, debug overlays.
  • +
+

Heavy artifacts (frames, masks, overlays) live in object storage. Checkpoint and StageOutput rows in Postgres hold structured outputs and references to S3 keys, never blobs.

+

Full storage reference →

+
+
+ +
+

CODE GENERATION

+

Source-of-truth dataclasses in core/schema/models/ → typed code in four targets.

+
+
    +
  • SQLModel ORM tables → core/db/models.py
  • +
  • Pydantic schemas (API request / response models)
  • +
  • TypeScript types (UI)
  • +
  • Protobuf definitions (gRPC stubs in core/rpc/)
  • +
+
+
# regenerate everything
+bash ctrl/generate.sh
+
+ +
+

DEV ENVIRONMENT

+

Tilt + Kind for local dev. Routing via Envoy Gateway on port 8080 — no nginx-ingress.

+
+

The Tiltfile lives at ctrl/Tiltfile and applies the kustomize overlay ctrl/k8s/overlays/dev/. Cluster name: kind-mpr. Tilt port-forwards Envoy (8080) and MinIO (9000 API, 9001 console).

+
    +
  • /api/detect/stream/* → FastAPI SSE (3600s idle timeout)
  • +
  • /api/* → FastAPI
  • +
  • /, /detection/* → detection-ui (with WS upgrade for Vite HMR)
  • +
+
+
# Add to /etc/hosts
+127.0.0.1 mpr.local.ar k8s.mpr.local.ar
+
+# Bring the cluster up
+cd ctrl
+./kind-create.sh           # one-time
+tilt up                    # builds + applies + port-forwards
+
+# UI:    http://k8s.mpr.local.ar:8080/
+# API:   http://k8s.mpr.local.ar:8080/api/
+# MinIO: http://localhost:9001  (console; admin / minioadmin)
+
+# Force a UI rebuild
+tilt trigger detection-ui
+
+ +
+

QUICK REFERENCE

+

Common commands and switches for working in MPR.

+
# Render SVGs from DOT files
 for f in docs/architecture/*.dot; do dot -Tsvg "$f" -o "${f%.dot}.svg"; done
 
-# Switch executor mode
-MPR_EXECUTOR=local    # Celery + MinIO
-MPR_EXECUTOR=lambda   # Step Functions + Lambda + S3
-MPR_EXECUTOR=gcp      # Cloud Run Jobs + GCS
-
- +# Regenerate models from core/schema/models/ +bash ctrl/generate.sh + +# Switch inference between local and GPU host +INFERENCE_URL= # local (CV runs in API process) +INFERENCE_URL=http://gpu-host:8000 # remote (core/gpu/server.py) + +# Skip VLM escalation paths +SKIP_VLM=1 +SKIP_CLOUD=1 + +# Tilt +cd ctrl && tilt up +tilt trigger detection-ui +
+

Reference docs:

+ +
+ + + + +
+ + + + diff --git a/docs/media-storage.html b/docs/media-storage.html deleted file mode 100644 index 4d30a51..0000000 --- a/docs/media-storage.html +++ /dev/null @@ -1,125 +0,0 @@ -

Media Storage Architecture

-

Overview

-

MPR separates media into input and output paths, each independently configurable. File paths are stored relative to their respective root to ensure portability between local development and cloud deployments (AWS S3, etc.).

-

Storage Strategy

-

Input / Output Separation

-

| Path | Env Var | Purpose | -|------|---------|---------| -| MEDIA_IN | /app/media/in | Source media files to process | -| MEDIA_OUT | /app/media/out | Transcoded/trimmed output files |

-

These can point to different locations or even different servers/buckets in production.

-

File Path Storage

- -

Why Relative Paths?

-
    -
  1. Portability: Same database works locally and in cloud
  2. -
  3. Flexibility: Easy to switch between storage backends
  4. -
  5. Simplicity: No need to update paths when migrating
  6. -
-

Local Development

-

Configuration

-

bash -MEDIA_IN=/app/media/in -MEDIA_OUT=/app/media/out

-

File Structure

-

/app/media/ -├── in/ # Source files -│ ├── video1.mp4 -│ ├── video2.mp4 -│ └── subfolder/ -│ └── video3.mp4 -└── out/ # Transcoded output - ├── video1_h264.mp4 - └── video2_trimmed.mp4

-

Database Storage

-

```

-

Source assets (scanned from media/in)

-

filename: video1.mp4 -file_path: video1.mp4

-

filename: video3.mp4 -file_path: subfolder/video3.mp4 -```

-

URL Serving

- -

AWS/Cloud Deployment

-

S3 Configuration

-

```bash

-

Input and output can be different buckets/paths

-

MEDIA_IN=s3://source-bucket/media/ -MEDIA_OUT=s3://output-bucket/transcoded/ -MEDIA_BASE_URL=https://source-bucket.s3.amazonaws.com/media/ -```

-

S3 Structure

-

``` -s3://source-bucket/media/ -├── video1.mp4 -└── subfolder/ - └── video3.mp4

-

s3://output-bucket/transcoded/ -├── video1_h264.mp4 -└── video2_trimmed.mp4 -```

-

Database Storage (Same!)

-

``` -filename: video1.mp4 -file_path: video1.mp4

-

filename: video3.mp4 -file_path: subfolder/video3.mp4 -```

-

API Endpoints

-

Scan Media Folder

-

http -POST /api/assets/scan

-

Behavior: -1. Recursively scans MEDIA_IN directory -2. Finds all video/audio files (mp4, mkv, avi, mov, mp3, wav, etc.) -3. Stores paths relative to MEDIA_IN -4. Skips already-registered files (by filename) -5. Returns summary: { found, registered, skipped, files }

-

Create Job

-

```http -POST /api/jobs/ -Content-Type: application/json

-

{ - "source_asset_id": "uuid", - "preset_id": "uuid", - "trim_start": 10.0, - "trim_end": 30.0 -} -```

-

Behavior: -- Server sets output_path using MEDIA_OUT + generated filename -- Output goes to the output directory, not alongside source files

-

Migration Guide

-

Moving from Local to S3

-
    -
  1. -

    Upload source files to S3: - bash - aws s3 sync /app/media/in/ s3://source-bucket/media/ - aws s3 sync /app/media/out/ s3://output-bucket/transcoded/

    -
  2. -
  3. -

    Update environment variables: - bash - MEDIA_IN=s3://source-bucket/media/ - MEDIA_OUT=s3://output-bucket/transcoded/ - MEDIA_BASE_URL=https://source-bucket.s3.amazonaws.com/media/

    -
  4. -
  5. -

    Database paths remain unchanged (already relative)

    -
  6. -
-

Supported File Types

-

Video: .mp4, .mkv, .avi, .mov, .webm, .flv, .wmv, .m4v -Audio: .mp3, .wav, .flac, .aac, .ogg, .m4a

\ No newline at end of file diff --git a/docs/viewer.html b/docs/viewer.html new file mode 100644 index 0000000..f2869e6 --- /dev/null +++ b/docs/viewer.html @@ -0,0 +1,97 @@ + + + + +Graph Viewer + + + +
+ +
+ + +