improve frontend and celery task architecture
This commit is contained in:
269
def/improvements/celery_log
Normal file
269
def/improvements/celery_log
Normal file
@@ -0,0 +1,269 @@
|
|||||||
|
I think this could be improved there are repeating configuration stuff (if using clip or vision) that shouldn be in the queued messages for analysis
|
||||||
|
is a straigh backend thing
|
||||||
|
|
||||||
|
|
||||||
|
there are repeating configuration stuff in celery messages, (if using clip or google vision) that shouldn't be in the queued messages for analysis
|
||||||
|
is a straigth backend/frontend comunnication thing and messages should be notified when theres a change or just read whatever is configured. maybe that is what it does but the whole logging for the message feels to verbose. also maybe there should be different quees for logo and images properties, I dont know. It feels messy. don't do anything yet tell me what do you think
|
||||||
|
|
||||||
|
|
||||||
|
✻ Thinking…
|
||||||
|
|
||||||
|
The user is making very valid points about the current implementation before moving to production. Let me analyze what they're pointing out:
|
||||||
|
|
||||||
|
1. Configuration repetition in Celery messages: Looking at the logs they showed earlier, I can see that every Celery task is:
|
||||||
|
- Loading and checking all providers from the database
|
||||||
|
- Reconfiguring the analysis engine every time
|
||||||
|
- Logging all the provider information repeatedly
|
||||||
|
2. Provider selection logic: The task is checking "if using CLIP or Google Vision" every single time, when this should be a configuration that's set once and cached.
|
||||||
|
3. Verbose logging: Too much debug info in production logs.
|
||||||
|
4. Queue separation: Right now everything goes through one queue, but they could have separate queues for different types of analysis (logo detection, visual
|
||||||
|
properties, etc.)
|
||||||
|
|
||||||
|
Let me think about what the clean architecture should look like:
|
||||||
|
|
||||||
|
Current problems:
|
||||||
|
- Provider configuration is loaded from DB on every task execution
|
||||||
|
- Engine is reconfigured from scratch every time
|
||||||
|
- Logging is too verbose for production
|
||||||
|
- Single queue for all analysis types
|
||||||
|
- No caching of provider configurations
|
||||||
|
|
||||||
|
Better architecture:
|
||||||
|
- Provider configuration should be loaded once at startup or cached with invalidation
|
||||||
|
- Engine should be configured once per worker, not per task
|
||||||
|
- Separate queues for different analysis types
|
||||||
|
- Cleaner logging levels
|
||||||
|
- Configuration changes should trigger worker restarts or cache invalidation
|
||||||
|
|
||||||
|
This is exactly the kind of architectural cleanup needed before going to production/K8s.
|
||||||
|
|
||||||
|
● You're absolutely right! The current implementation has several architectural issues that need fixing before production. Let me break down what I see:
|
||||||
|
|
||||||
|
Current Problems
|
||||||
|
|
||||||
|
1. Configuration Overhead
|
||||||
|
- Every Celery task loads providers from DB
|
||||||
|
- Engine reconfigured from scratch each time
|
||||||
|
- Provider selection logic runs repeatedly
|
||||||
|
- Lots of "Found 2 total providers" spam
|
||||||
|
|
||||||
|
2. Poor Separation of Concerns
|
||||||
|
- Single queue handling all analysis types
|
||||||
|
- Mixed responsibilities (config + processing)
|
||||||
|
- No caching of provider settings
|
||||||
|
|
||||||
|
3. Production-Unfriendly Logging
|
||||||
|
- Too verbose for production
|
||||||
|
- Debug info cluttering worker logs
|
||||||
|
- Configuration details logged every task
|
||||||
|
|
||||||
|
Better Architecture
|
||||||
|
|
||||||
|
Configuration Management:
|
||||||
|
- Load provider config once at worker startup
|
||||||
|
- Cache active providers in Redis/memory
|
||||||
|
- Only reload config when explicitly changed
|
||||||
|
- Clean separation of config vs processing
|
||||||
|
|
||||||
|
Queue Separation:
|
||||||
|
logo_detection_queue -> only logo analysis
|
||||||
|
visual_analysis_queue -> brightness, colors, etc
|
||||||
|
object_detection_queue -> future YOLO objects
|
||||||
|
|
||||||
|
Worker Specialization:
|
||||||
|
- Logo detection workers (CLIP-focused)
|
||||||
|
- Visual analysis workers (OpenCV-focused)
|
||||||
|
- Different scaling profiles per queue type
|
||||||
|
|
||||||
|
Configuration Flow:
|
||||||
|
- Admin changes provider settings → triggers config reload
|
||||||
|
- Workers cache config until reload signal
|
||||||
|
- Clean task payloads with minimal data
|
||||||
|
|
||||||
|
Logging Levels:
|
||||||
|
- DEBUG: detailed analysis info
|
||||||
|
- INFO: task completion only
|
||||||
|
- ERROR: actual failures
|
||||||
|
|
||||||
|
This cleanup is essential before K8s deployment where you'll have multiple worker pods and need clean scaling/monitoring.
|
||||||
|
|
||||||
|
Want me to refactor this architecture now, or should we plan it out more first?
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
[2025-08-18 16:37:55,910: INFO/MainProcess] mingle: searching for neighbors
|
||||||
|
[2025-08-18 16:37:56,915: INFO/MainProcess] mingle: all alone
|
||||||
|
[2025-08-18 16:37:56,924: INFO/MainProcess] celery@mcrn2 ready.
|
||||||
|
[2025-08-18 16:37:56,927: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[cde6573e-6d03-4904-b042-4e29a86652b7] received
|
||||||
|
[2025-08-18 16:37:56,929: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[1e4ed524-cdd7-45c5-b599-0c8ece4c5c14] received
|
||||||
|
[2025-08-18 16:37:56,930: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[5fec033e-3c12-4327-b342-3d49a966b516] received
|
||||||
|
[2025-08-18 16:37:56,931: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[be45792e-fca0-4c83-8f9d-2baa42bf307b] received
|
||||||
|
[2025-08-18 16:37:56,932: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[a38470b0-ca68-4038-84fc-f077ee4cdfd3] received
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-4] Found 2 total providers:
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-2] Found 2 total providers:
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-1] Found 2 total providers:
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-3] Found 2 total providers:
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-16] Found 2 total providers:
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-4] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-2] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-4] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-1] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-3] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-2] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-16] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-1] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-3] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:37:56,945: INFO/ForkPoolWorker-16] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-1] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-4] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-1] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-16] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-4] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-16] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-1] Engine configuration completed
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-2] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-3] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-1] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-8.ts
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-4] Engine configuration completed
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-16] Engine configuration completed
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-3] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-2] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-16] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-7.ts
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-4] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-11.ts
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-3] Engine configuration completed
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-2] Engine configuration completed
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-3] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-10.ts
|
||||||
|
[2025-08-18 16:37:56,946: INFO/ForkPoolWorker-2] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-9.ts
|
||||||
|
[h264 @ 0x3c4d2d00] [h264 @ 0x3c4d3300] non-existing SPS 0 referenced in buffering period
|
||||||
|
non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2540] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d3300] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2d00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2540] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:37:57,029: INFO/ForkPoolWorker-3] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:37:57,029: INFO/ForkPoolWorker-1] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:37:57,030: INFO/ForkPoolWorker-3] Starting frame analysis...
|
||||||
|
[2025-08-18 16:37:57,030: INFO/ForkPoolWorker-1] Starting frame analysis...
|
||||||
|
[2025-08-18 16:37:57,030: INFO/ForkPoolWorker-16] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:37:57,030: INFO/ForkPoolWorker-16] Starting frame analysis...
|
||||||
|
[2025-08-18 16:37:57,030: INFO/ForkPoolWorker-2] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:37:57,031: INFO/ForkPoolWorker-2] Starting frame analysis...
|
||||||
|
[2025-08-18 16:37:57,031: INFO/ForkPoolWorker-4] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:37:57,031: INFO/ForkPoolWorker-4] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:01,613: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[a35f6a46-37d4-4c21-9031-6d94ba336bd7] received
|
||||||
|
[2025-08-18 16:38:01,642: INFO/ForkPoolWorker-5] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:01,645: INFO/ForkPoolWorker-5] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:01,645: INFO/ForkPoolWorker-5] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:01,648: INFO/ForkPoolWorker-5] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:01,648: INFO/ForkPoolWorker-5] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:01,648: INFO/ForkPoolWorker-5] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:01,649: INFO/ForkPoolWorker-5] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-12.ts
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x3c4d2200] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:01,760: INFO/ForkPoolWorker-5] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:01,760: INFO/ForkPoolWorker-5] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:02,265: WARNING/ForkPoolWorker-4] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:02,267: WARNING/ForkPoolWorker-2] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:02,292: WARNING/ForkPoolWorker-3] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:02,326: WARNING/ForkPoolWorker-16] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:02,340: WARNING/ForkPoolWorker-1] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:06,140: WARNING/ForkPoolWorker-5] Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
|
||||||
|
[2025-08-18 16:38:09,260: INFO/ForkPoolWorker-4] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.4090644121170044, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Apple', 'confidence': 0.3697817027568817, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[20, 21, 24], [173, 172, 174], [54, 55, 64]], 'brightness_level': 0.1390595900911805, 'contrast_level': 0.12830904267259136, 'saturation_level': 0.13596454021423385}}
|
||||||
|
[2025-08-18 16:38:09,279: INFO/ForkPoolWorker-4] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-11.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:09,298: INFO/ForkPoolWorker-4] Task ai_processing.tasks.process_video_segment[a38470b0-ca68-4038-84fc-f077ee4cdfd3] succeeded in 12.365913566900417s: {'detections': 2, 'analysis_id': '116a81bc-b689-4f31-8fcd-08ebb3ab63c5'}
|
||||||
|
[2025-08-18 16:38:09,661: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[8a93b9fd-33e1-412a-a7e4-7bfa25127f79] received
|
||||||
|
[2025-08-18 16:38:09,686: INFO/ForkPoolWorker-4] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:09,687: INFO/ForkPoolWorker-4] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:09,687: INFO/ForkPoolWorker-4] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:09,688: INFO/ForkPoolWorker-4] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:09,688: INFO/ForkPoolWorker-4] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:09,688: INFO/ForkPoolWorker-4] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:09,688: INFO/ForkPoolWorker-4] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-13.ts
|
||||||
|
[h264 @ 0x445059c0] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x445059c0] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:09,753: INFO/ForkPoolWorker-4] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:09,753: INFO/ForkPoolWorker-4] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:13,727: INFO/ForkPoolWorker-3] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.42210501432418823, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Apple', 'confidence': 0.3576214611530304, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[20, 21, 24], [172, 172, 174], [54, 55, 64]], 'brightness_level': 0.13935481912672476, 'contrast_level': 0.128542122700099, 'saturation_level': 0.1362288338174776}}
|
||||||
|
[2025-08-18 16:38:13,753: INFO/ForkPoolWorker-3] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-10.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:13,775: INFO/ForkPoolWorker-3] Task ai_processing.tasks.process_video_segment[be45792e-fca0-4c83-8f9d-2baa42bf307b] succeeded in 16.843556493055075s: {'detections': 2, 'analysis_id': 'd3f2f585-822c-46b7-b2a4-d404be6bfe5c'}
|
||||||
|
[2025-08-18 16:38:15,519: INFO/ForkPoolWorker-2] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.40869590640068054, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Apple', 'confidence': 0.36340251564979553, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[20, 21, 24], [174, 173, 175], [54, 55, 64]], 'brightness_level': 0.13968061229121279, 'contrast_level': 0.12943472543277817, 'saturation_level': 0.13624170902929073}}
|
||||||
|
[2025-08-18 16:38:15,544: INFO/ForkPoolWorker-2] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-9.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:15,569: INFO/ForkPoolWorker-2] Task ai_processing.tasks.process_video_segment[5fec033e-3c12-4327-b342-3d49a966b516] succeeded in 18.638674875022843s: {'detections': 2, 'analysis_id': '84408ae9-b4f4-42f1-bdfa-357aece644d4'}
|
||||||
|
[2025-08-18 16:38:16,317: INFO/ForkPoolWorker-1] Analysis results: {'logos': [{'label': 'Apple', 'confidence': 0.39591965079307556, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Google', 'confidence': 0.36241379380226135, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[20, 21, 24], [173, 173, 175], [54, 55, 64]], 'brightness_level': 0.1394802358186073, 'contrast_level': 0.12914930695311738, 'saturation_level': 0.13601520893851368}}
|
||||||
|
[2025-08-18 16:38:16,333: INFO/ForkPoolWorker-1] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-8.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:16,354: INFO/ForkPoolWorker-1] Task ai_processing.tasks.process_video_segment[1e4ed524-cdd7-45c5-b599-0c8ece4c5c14] succeeded in 19.42475356196519s: {'detections': 2, 'analysis_id': '07c4774d-7c73-4081-9de6-ff3c837a5d48'}
|
||||||
|
[2025-08-18 16:38:16,575: INFO/ForkPoolWorker-16] Analysis results: {'logos': [{'label': 'Apple', 'confidence': 0.4048289358615875, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Google', 'confidence': 0.35931655764579773, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[20, 21, 24], [173, 172, 175], [54, 55, 64]], 'brightness_level': 0.13938966679778908, 'contrast_level': 0.12867553157407957, 'saturation_level': 0.13664221927196804}}
|
||||||
|
[2025-08-18 16:38:16,587: INFO/ForkPoolWorker-16] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-7.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:16,609: INFO/ForkPoolWorker-16] Task ai_processing.tasks.process_video_segment[cde6573e-6d03-4904-b042-4e29a86652b7] succeeded in 19.680853400961496s: {'detections': 2, 'analysis_id': '15ab48fa-3074-4361-b8d4-f03c612faee4'}
|
||||||
|
[2025-08-18 16:38:17,215: INFO/ForkPoolWorker-5] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.5754773020744324, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[21, 21, 25], [174, 174, 177], [58, 60, 77]], 'brightness_level': 0.13535354446562575, 'contrast_level': 0.1363023302612117, 'saturation_level': 0.16533522830428468}}
|
||||||
|
[2025-08-18 16:38:17,228: INFO/ForkPoolWorker-5] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-12.ts: 1 detections
|
||||||
|
[2025-08-18 16:38:17,244: INFO/ForkPoolWorker-5] Task ai_processing.tasks.process_video_segment[a35f6a46-37d4-4c21-9031-6d94ba336bd7] succeeded in 15.629837638000026s: {'detections': 1, 'analysis_id': '77f309e6-8af4-4e13-a40e-ca95843a42c0'}
|
||||||
|
[2025-08-18 16:38:17,294: INFO/ForkPoolWorker-4] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.6661646962165833, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[176, 175, 177], [60, 58, 69], [1, 1, 2]], 'brightness_level': 0.0801338684136206, 'contrast_level': 0.18819245811973462, 'saturation_level': 0.23137383313664972}}
|
||||||
|
[2025-08-18 16:38:17,305: INFO/ForkPoolWorker-4] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-13.ts: 1 detections
|
||||||
|
[2025-08-18 16:38:17,324: INFO/ForkPoolWorker-4] Task ai_processing.tasks.process_video_segment[8a93b9fd-33e1-412a-a7e4-7bfa25127f79] succeeded in 7.6547317589866s: {'detections': 1, 'analysis_id': '024aaab0-61ba-4702-a2ca-c1ec9992fe48'}
|
||||||
|
[2025-08-18 16:38:17,611: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[8d243811-b3f3-4588-a035-0f5df6f8d1e5] received
|
||||||
|
[2025-08-18 16:38:17,620: INFO/ForkPoolWorker-16] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:17,620: INFO/ForkPoolWorker-16] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:17,620: INFO/ForkPoolWorker-16] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:17,621: INFO/ForkPoolWorker-16] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:17,621: INFO/ForkPoolWorker-16] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:17,621: INFO/ForkPoolWorker-16] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:17,621: INFO/ForkPoolWorker-16] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-14.ts
|
||||||
|
[h264 @ 0x44738e00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x44738e00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:17,681: INFO/ForkPoolWorker-16] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:17,681: INFO/ForkPoolWorker-16] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:22,265: INFO/ForkPoolWorker-16] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.3460542857646942, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Apple', 'confidence': 0.319761723279953, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[147, 147, 154], [2, 2, 2], [71, 47, 56]], 'brightness_level': 0.08447971952412651, 'contrast_level': 0.16374765632789706, 'saturation_level': 0.13736099574860808}}
|
||||||
|
[2025-08-18 16:38:22,275: INFO/ForkPoolWorker-16] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-14.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:22,296: INFO/ForkPoolWorker-16] Task ai_processing.tasks.process_video_segment[8d243811-b3f3-4588-a035-0f5df6f8d1e5] succeeded in 4.684708549058996s: {'detections': 2, 'analysis_id': 'ffcd58b6-ec46-42bd-aadf-41caf8a231c4'}
|
||||||
|
[2025-08-18 16:38:25,611: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[dbcd3dfc-dbd7-452f-83b8-73e4744220d5] received
|
||||||
|
[2025-08-18 16:38:25,621: INFO/ForkPoolWorker-16] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:25,622: INFO/ForkPoolWorker-16] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:25,622: INFO/ForkPoolWorker-16] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:25,623: INFO/ForkPoolWorker-16] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:25,623: INFO/ForkPoolWorker-16] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:25,623: INFO/ForkPoolWorker-16] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:25,623: INFO/ForkPoolWorker-16] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-15.ts
|
||||||
|
[h264 @ 0x45e13c00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x45e13c00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:25,669: INFO/ForkPoolWorker-16] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:25,669: INFO/ForkPoolWorker-16] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:30,066: INFO/ForkPoolWorker-16] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.5520480275154114, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}, {'label': 'Apple', 'confidence': 0.3003958761692047, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[179, 178, 181], [73, 77, 104], [23, 23, 27]], 'brightness_level': 0.1378174454581215, 'contrast_level': 0.13558354321945026, 'saturation_level': 0.15619522928770274}}
|
||||||
|
[2025-08-18 16:38:30,076: INFO/ForkPoolWorker-16] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-15.ts: 2 detections
|
||||||
|
[2025-08-18 16:38:30,097: INFO/ForkPoolWorker-16] Task ai_processing.tasks.process_video_segment[dbcd3dfc-dbd7-452f-83b8-73e4744220d5] succeeded in 4.485186079982668s: {'detections': 2, 'analysis_id': 'cc8250de-518b-45d0-917c-895cb1240545'}
|
||||||
|
[2025-08-18 16:38:33,569: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[cf3fe764-c8e1-467b-bfc4-17d11443e7e4] received
|
||||||
|
[2025-08-18 16:38:33,578: INFO/ForkPoolWorker-16] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:33,578: INFO/ForkPoolWorker-16] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:33,578: INFO/ForkPoolWorker-16] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:33,579: INFO/ForkPoolWorker-16] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:33,579: INFO/ForkPoolWorker-16] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:33,579: INFO/ForkPoolWorker-16] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:33,579: INFO/ForkPoolWorker-16] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-16.ts
|
||||||
|
[h264 @ 0x44738e00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x44738e00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:33,624: INFO/ForkPoolWorker-16] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:33,624: INFO/ForkPoolWorker-16] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:38,435: INFO/ForkPoolWorker-16] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.5440018177032471, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[72, 77, 103], [178, 177, 180], [23, 22, 27]], 'brightness_level': 0.13682642293028321, 'contrast_level': 0.1367056198691002, 'saturation_level': 0.16388074353969984}}
|
||||||
|
[2025-08-18 16:38:38,444: INFO/ForkPoolWorker-16] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-16.ts: 1 detections
|
||||||
|
[2025-08-18 16:38:38,464: INFO/ForkPoolWorker-16] Task ai_processing.tasks.process_video_segment[cf3fe764-c8e1-467b-bfc4-17d11443e7e4] succeeded in 4.894647152977996s: {'detections': 1, 'analysis_id': '19b631d3-36ab-4799-a5a6-b632590cda54'}
|
||||||
|
[2025-08-18 16:38:41,618: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[2ce8e89e-046c-4ded-860f-addaa5bd7fa0] received
|
||||||
|
[2025-08-18 16:38:41,629: INFO/ForkPoolWorker-16] Found 2 total providers:
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] - Local CLIP: local_clip (active: True)
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] - Google Cloud Vision: gcp_vision (active: False)
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] Found CLIP provider: Local CLIP (active: True)
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] Configuring engine with config: {'logo_detection': {'provider_type': 'local_clip', 'model_identifier': 'openai/clip-vit-base-patch32'}}
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] Engine configuration completed
|
||||||
|
[2025-08-18 16:38:41,630: INFO/ForkPoolWorker-16] Extracting frame from: /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-17.ts
|
||||||
|
[h264 @ 0x45e13c00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[h264 @ 0x45e13c00] non-existing SPS 0 referenced in buffering period
|
||||||
|
[2025-08-18 16:38:41,676: INFO/ForkPoolWorker-16] Frame extracted successfully, size: (1920, 1080)
|
||||||
|
[2025-08-18 16:38:41,676: INFO/ForkPoolWorker-16] Starting frame analysis...
|
||||||
|
[2025-08-18 16:38:45,979: INFO/ForkPoolWorker-16] Analysis results: {'logos': [{'label': 'Google', 'confidence': 0.5482607483863831, 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1}}], 'visual': {'dominant_colors': [[22, 23, 27], [177, 177, 180], [72, 77, 103]], 'brightness_level': 0.13457802728859034, 'contrast_level': 0.13673370563295395, 'saturation_level': 0.22427909215383685}}
|
||||||
|
[2025-08-18 16:38:45,989: INFO/ForkPoolWorker-16] Processed segment /home/mariano/wdir/snk/media-analyzer/media/43606ec7-786c-4f7d-acf3-95981f9e5ebe-17.ts: 1 detections
|
||||||
|
[2025-08-18 16:38:46,008: INFO/ForkPoolWorker-16] Task ai_processing.tasks.process_video_segment[2ce8e89e-046c-4ded-860f-addaa5bd7fa0] succeeded in 4.389555325964466s: {'detections': 1, 'analysis_id': '918ac225-e0e8-41f0-82e8-328f4f5ae559'}
|
||||||
|
[2025-08-18 16:38:49,610: INFO/MainProcess] Task ai_processing.tasks.process_video_segment[536e39ce-e31b-4b06-960d-9d667ff388b4] received
|
||||||
23
def/improvements/image_analysis_outofsync
Normal file
23
def/improvements/image_analysis_outofsync
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
> there is some delay on the image shown and the detection, like when I put the logo in from of the camera the detection shows before the image. dont correct that now
|
||||||
|
but for to know
|
||||||
|
|
||||||
|
● Good observation! That's a classic real-time streaming issue where the analysis pipeline is running ahead of the video display pipeline.
|
||||||
|
|
||||||
|
What's happening:
|
||||||
|
- Analysis pipeline: Celery processes video segments immediately when they're created
|
||||||
|
- Display pipeline: HLS player has buffering/latency for smooth playback
|
||||||
|
- Result: AI detections appear before the video shows that frame
|
||||||
|
|
||||||
|
Common causes:
|
||||||
|
- HLS inherent latency (2-6 seconds typical)
|
||||||
|
- Browser video buffering
|
||||||
|
- Network buffering in HLS.js
|
||||||
|
|
||||||
|
Potential solutions (for later):
|
||||||
|
- Sync detections with video timestamp
|
||||||
|
- Delay analysis results to match video latency
|
||||||
|
- Use lower-latency streaming (WebRTC instead of HLS)
|
||||||
|
- Add timestamp correlation between segments and detections
|
||||||
|
|
||||||
|
For now, at least we know the AI pipeline is working fast! The detection accuracy and real-time processing are the main goals achieved. The timing sync is a polish
|
||||||
|
issue we can address later. 📡⏰
|
||||||
54
def/improvements/speed_accuracy
Normal file
54
def/improvements/speed_accuracy
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
> acuraccy and timing suffered a bit, but I think is better to have it closer to
|
||||||
|
what be in production. still is very slow, what can be done to improve that.
|
||||||
|
dont change anything. I think this is meant more for batch processing not
|
||||||
|
realtime
|
||||||
|
|
||||||
|
● You're absolutely right! This architecture is indeed more suited for batch processing than real-time. The timing issues you're seeing are fundamental to the current
|
||||||
|
design:
|
||||||
|
|
||||||
|
Current Bottlenecks
|
||||||
|
|
||||||
|
1. HLS Inherent Latency
|
||||||
|
- 6-10 second segments + buffering
|
||||||
|
- Browser video player lag
|
||||||
|
- Not designed for real-time interaction
|
||||||
|
|
||||||
|
2. Processing Pipeline Overhead
|
||||||
|
- File I/O for each segment
|
||||||
|
- CLIP model inference (CPU-heavy)
|
||||||
|
- Database writes + WebSocket sends
|
||||||
|
- Container networking delays
|
||||||
|
|
||||||
|
3. Queue Architecture
|
||||||
|
- Redis message passing
|
||||||
|
- Task serialization/deserialization
|
||||||
|
- Worker startup overhead per task
|
||||||
|
|
||||||
|
Real-Time Alternatives (for reference)
|
||||||
|
|
||||||
|
WebRTC + Frame Streaming
|
||||||
|
- Direct browser camera → WebSocket frames
|
||||||
|
- Skip file storage entirely
|
||||||
|
- Sub-second latency possible
|
||||||
|
|
||||||
|
Edge Processing
|
||||||
|
- Lighter models (MobileNet vs CLIP)
|
||||||
|
- Frame sampling (every 5th frame)
|
||||||
|
- Client-side pre-processing
|
||||||
|
|
||||||
|
Streaming Analytics
|
||||||
|
- Apache Kafka + stream processing
|
||||||
|
- Continuous analysis pipeline
|
||||||
|
- Built for real-time data flows
|
||||||
|
|
||||||
|
Current System's Sweet Spot
|
||||||
|
|
||||||
|
Your architecture excels at:
|
||||||
|
- Content moderation (analyze uploaded videos)
|
||||||
|
- Media library analysis (batch process existing content)
|
||||||
|
- Compliance monitoring (post-process broadcasts)
|
||||||
|
- Marketing analytics (analyze ad placements over time)
|
||||||
|
|
||||||
|
The real-time demo works well enough to show the technology, but you're right that production would likely be batch-oriented for accuracy and reliability.
|
||||||
|
|
||||||
|
For K8s, this actually showcases better - batch workloads with auto-scaling based on queue depth is a perfect Kubernetes use case! 🎯
|
||||||
43
def/k8splan/moving_to_k8s
Normal file
43
def/k8splan/moving_to_k8s
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
> just keep k8s in mind but I think is better to dedicate a session for k8s. anything I should know now? I understand the concepts well and have some practical
|
||||||
|
experience (on the job and running a cluster locally) but its been a while
|
||||||
|
|
||||||
|
● Good to plan ahead! Here are the key things to keep in mind for the K8s session:
|
||||||
|
|
||||||
|
K8s-Specific Considerations for Your App
|
||||||
|
|
||||||
|
Stateful Components:
|
||||||
|
- PostgreSQL (needs StatefulSet + PVC)
|
||||||
|
- Redis (can be deployment but consider persistence)
|
||||||
|
- Media files storage (shared PVC across workers)
|
||||||
|
|
||||||
|
Service Architecture:
|
||||||
|
- Django ASGI (HTTP + WebSocket)
|
||||||
|
- Celery workers (separate deployment)
|
||||||
|
- Nginx (HLS file serving)
|
||||||
|
- External services (PostgreSQL, Redis)
|
||||||
|
|
||||||
|
Scaling Patterns:
|
||||||
|
- HPA for Celery workers based on queue depth
|
||||||
|
- Video processing is CPU/memory intensive
|
||||||
|
- WebSocket connections need session affinity
|
||||||
|
|
||||||
|
Storage:
|
||||||
|
- Shared volume for HLS segments (/media)
|
||||||
|
- Database persistence
|
||||||
|
- Consider GCP Cloud Storage integration
|
||||||
|
|
||||||
|
Networking:
|
||||||
|
- Ingress for external access
|
||||||
|
- Internal services communication
|
||||||
|
- WebSocket support in ingress
|
||||||
|
|
||||||
|
GCP Integration Points:
|
||||||
|
- GKE cluster
|
||||||
|
- Cloud Storage for media
|
||||||
|
- Cloud SQL (optional PostgreSQL upgrade)
|
||||||
|
- Container Registry for images
|
||||||
|
|
||||||
|
The current Docker Compose setup translates well to K8s, but you'll need persistent volumes and proper service discovery. Your video processing workload is perfect
|
||||||
|
for demonstrating K8s scaling capabilities.
|
||||||
|
|
||||||
|
We can tackle the architecture cleanup first, then dedicate a full session to the K8s deployment!
|
||||||
@@ -93,3 +93,89 @@ add aliases to avoid relative imports
|
|||||||
│ │ 6 selector: 'app-stream-viewer', │ │
|
│ │ 6 selector: 'app-stream-viewer', │ │
|
||||||
│ ╰───────────────────────────────────────────────────────────
|
│ ╰───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
I need more details on why celery for the AI processing task, I reckon that some queue is needed for compute and/or IO heavy interactions like AI processing which not just passing through what comes from the stream but yeah, need some depth
|
||||||
|
|
||||||
|
|
||||||
|
Your system should now show live logo detection when you hold up branded items! 🏷️
|
||||||
|
|
||||||
|
> cool, just a question dont do anything but I need more details on why celery for the AI processing task, I reckon that some queue is needed for compute and/or IO heavy interactions
|
||||||
|
like AI processing which not just passing through what comes from the stream but yeah, need some depth
|
||||||
|
|
||||||
|
● You're absolutely right to question this! Let me explain the specific reasons for Celery in AI video processing:
|
||||||
|
|
||||||
|
Why Celery for AI Processing?
|
||||||
|
|
||||||
|
1. Processing Time Reality
|
||||||
|
|
||||||
|
- CLIP model inference: 200-500ms per frame on CPU, 50-100ms on GPU
|
||||||
|
- Frame extraction: 50-200ms per video segment
|
||||||
|
- Total per segment: 250-700ms minimum
|
||||||
|
- Problem: Django request-response cycle would timeout, blocking the web server
|
||||||
|
|
||||||
|
2. HLS Streaming Constraints
|
||||||
|
|
||||||
|
- HLS segments: Created every 2-6 seconds by FFmpeg
|
||||||
|
- Analysis lag: If processing takes 500ms, you're always behind
|
||||||
|
- User experience: Video player can't wait for AI analysis to complete
|
||||||
|
- Solution: Async processing lets video serve immediately, AI catches up
|
||||||
|
|
||||||
|
3. Resource Management
|
||||||
|
|
||||||
|
- Memory intensive: Each CLIP model uses 400MB-1GB RAM
|
||||||
|
- CPU bound: AI inference blocks the entire process
|
||||||
|
- Multiple streams: 5 concurrent streams = 5x resource usage
|
||||||
|
- Queue benefits: Limit concurrent AI tasks, prevent memory exhaustion
|
||||||
|
|
||||||
|
4. Scaling Considerations
|
||||||
|
|
||||||
|
- Single server: Process 2-3 streams simultaneously
|
||||||
|
- Multiple workers: Celery scales across machines/GPUs
|
||||||
|
- Priority queues: Real-time streams get priority over batch analysis
|
||||||
|
- Failure handling: Retry failed analyses, dead letter queues
|
||||||
|
|
||||||
|
Alternative Approaches
|
||||||
|
|
||||||
|
Without Celery (synchronous):
|
||||||
|
# This blocks the Django thread for 500ms+
|
||||||
|
def serve_hls_file(request, filename):
|
||||||
|
# ... serve file ...
|
||||||
|
analysis = analyze_frame_now(segment) # BLOCKS HERE
|
||||||
|
return response # Takes 500ms+ total
|
||||||
|
|
||||||
|
With threading (risky):
|
||||||
|
# Memory leaks, no queue management, hard to monitor
|
||||||
|
import threading
|
||||||
|
threading.Thread(target=analyze_frame, args=(segment,)).start()
|
||||||
|
|
||||||
|
With asyncio (complex):
|
||||||
|
# Requires entire Django app to be async, complex error handling
|
||||||
|
async def analyze_frame_async():
|
||||||
|
# Works but harder to manage resources
|
||||||
|
|
||||||
|
Your Architecture Benefits
|
||||||
|
|
||||||
|
Current flow:
|
||||||
|
1. HLS segment created → Served immediately (fast)
|
||||||
|
2. Analysis queued → Celery worker picks it up
|
||||||
|
3. Results ready → WebSocket pushes to frontend
|
||||||
|
4. User sees: Video plays smoothly + analysis appears seconds later
|
||||||
|
|
||||||
|
Without queue:
|
||||||
|
1. HLS segment created → Wait for AI analysis (slow)
|
||||||
|
2. User experience: Choppy video, timeouts
|
||||||
|
3. Server: One slow AI task blocks everything
|
||||||
|
|
||||||
|
Business Value
|
||||||
|
|
||||||
|
- Real-time streaming: Video never stutters
|
||||||
|
- Scalable: Add more Celery workers as needed
|
||||||
|
- Reliable: Failed analyses retry automatically
|
||||||
|
- Monitorable: Queue depth shows system load
|
||||||
|
|
||||||
|
For media monitoring at scale (like Seenka's use case), you need this separation because analyzing "millones de emisiones" means handling thousands of concurrent streams - impossible
|
||||||
|
without proper async processing.
|
||||||
|
|
||||||
|
The queue is essential for production-grade video AI systems!
|
||||||
|
|||||||
101
media-analyzer/backend/ai_processing/config_manager.py
Normal file
101
media-analyzer/backend/ai_processing/config_manager.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from typing import Dict, Optional, Any
|
||||||
|
from django.core.cache import cache
|
||||||
|
from .models import AnalysisProvider
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AnalysisConfigManager:
|
||||||
|
"""Singleton configuration manager for analysis providers"""
|
||||||
|
|
||||||
|
_instance = None
|
||||||
|
_lock = threading.Lock()
|
||||||
|
|
||||||
|
def __new__(cls):
|
||||||
|
if cls._instance is None:
|
||||||
|
with cls._lock:
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = super().__new__(cls)
|
||||||
|
cls._instance._initialized = False
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
if not self._initialized:
|
||||||
|
self._config_cache = {}
|
||||||
|
self._providers_cache = {}
|
||||||
|
self._cache_key = "analysis_providers_config"
|
||||||
|
self._initialized = True
|
||||||
|
self.reload_config()
|
||||||
|
|
||||||
|
def reload_config(self) -> None:
|
||||||
|
"""Reload provider configuration from database"""
|
||||||
|
try:
|
||||||
|
providers = AnalysisProvider.objects.filter(active=True)
|
||||||
|
|
||||||
|
# Cache providers by type
|
||||||
|
self._providers_cache = {}
|
||||||
|
config = {}
|
||||||
|
|
||||||
|
for provider in providers:
|
||||||
|
self._providers_cache[provider.provider_type] = {
|
||||||
|
'id': provider.id,
|
||||||
|
'name': provider.name,
|
||||||
|
'provider_type': provider.provider_type,
|
||||||
|
'model_identifier': provider.model_identifier,
|
||||||
|
'capabilities': provider.capabilities,
|
||||||
|
'config': provider.api_config,
|
||||||
|
'active': provider.active
|
||||||
|
}
|
||||||
|
|
||||||
|
# Build analysis type configuration
|
||||||
|
for capability in provider.capabilities:
|
||||||
|
config[capability] = {
|
||||||
|
'provider_type': provider.provider_type,
|
||||||
|
'model_identifier': provider.model_identifier,
|
||||||
|
'config': provider.api_config
|
||||||
|
}
|
||||||
|
|
||||||
|
self._config_cache = config
|
||||||
|
|
||||||
|
# Cache in Django cache for other workers
|
||||||
|
cache.set(self._cache_key, {
|
||||||
|
'providers': self._providers_cache,
|
||||||
|
'config': self._config_cache
|
||||||
|
}, timeout=3600) # 1 hour
|
||||||
|
|
||||||
|
logger.info(f"Configuration reloaded: {len(providers)} active providers")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to reload configuration: {e}")
|
||||||
|
# Try to load from cache as fallback
|
||||||
|
cached_data = cache.get(self._cache_key)
|
||||||
|
if cached_data:
|
||||||
|
self._providers_cache = cached_data['providers']
|
||||||
|
self._config_cache = cached_data['config']
|
||||||
|
logger.info("Loaded configuration from cache as fallback")
|
||||||
|
|
||||||
|
def get_provider_config(self, analysis_type: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get configuration for specific analysis type"""
|
||||||
|
return self._config_cache.get(analysis_type)
|
||||||
|
|
||||||
|
def get_provider_by_type(self, provider_type: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get provider info by provider type"""
|
||||||
|
return self._providers_cache.get(provider_type)
|
||||||
|
|
||||||
|
def has_capability(self, analysis_type: str) -> bool:
|
||||||
|
"""Check if any provider supports the analysis type"""
|
||||||
|
return analysis_type in self._config_cache
|
||||||
|
|
||||||
|
def get_active_capabilities(self) -> list:
|
||||||
|
"""Get list of all supported analysis capabilities"""
|
||||||
|
return list(self._config_cache.keys())
|
||||||
|
|
||||||
|
def get_config_for_engine(self) -> Dict[str, Any]:
|
||||||
|
"""Get configuration in format expected by AnalysisEngine"""
|
||||||
|
return self._config_cache.copy()
|
||||||
|
|
||||||
|
|
||||||
|
# Global instance
|
||||||
|
config_manager = AnalysisConfigManager()
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
# Generated migration to make provider field nullable
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
import django.db.models.deletion
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('ai_processing', '0001_initial'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='videoanalysis',
|
||||||
|
name='provider',
|
||||||
|
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='ai_processing.analysisprovider'),
|
||||||
|
),
|
||||||
|
]
|
||||||
@@ -32,7 +32,7 @@ class VideoAnalysis(models.Model):
|
|||||||
segment_path = models.CharField(max_length=500)
|
segment_path = models.CharField(max_length=500)
|
||||||
timestamp = models.DateTimeField(auto_now_add=True)
|
timestamp = models.DateTimeField(auto_now_add=True)
|
||||||
processing_time = models.FloatField(null=True)
|
processing_time = models.FloatField(null=True)
|
||||||
provider = models.ForeignKey(AnalysisProvider, on_delete=models.CASCADE)
|
provider = models.ForeignKey(AnalysisProvider, on_delete=models.CASCADE, null=True, blank=True)
|
||||||
analysis_type = models.CharField(max_length=50)
|
analysis_type = models.CharField(max_length=50)
|
||||||
confidence_threshold = models.FloatField(default=0.5)
|
confidence_threshold = models.FloatField(default=0.5)
|
||||||
frame_timestamp = models.FloatField()
|
frame_timestamp = models.FloatField()
|
||||||
@@ -46,7 +46,7 @@ class VideoAnalysis(models.Model):
|
|||||||
'processing_time': self.processing_time,
|
'processing_time': self.processing_time,
|
||||||
'analysis_type': self.analysis_type,
|
'analysis_type': self.analysis_type,
|
||||||
'frame_timestamp': self.frame_timestamp,
|
'frame_timestamp': self.frame_timestamp,
|
||||||
'provider': self.provider.name,
|
'provider': self.provider.name if self.provider else 'local',
|
||||||
'detections': [d.to_dict() for d in self.detections.all()],
|
'detections': [d.to_dict() for d in self.detections.all()],
|
||||||
'visual': self.visual.to_dict() if hasattr(self, 'visual') else None
|
'visual': self.visual.to_dict() if hasattr(self, 'visual') else None
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,14 +4,117 @@ from channels.layers import get_channel_layer
|
|||||||
from asgiref.sync import async_to_sync
|
from asgiref.sync import async_to_sync
|
||||||
from .analysis_engine import AnalysisEngine
|
from .analysis_engine import AnalysisEngine
|
||||||
from .models import VideoAnalysis, DetectionResult, VisualAnalysis, ProcessingQueue, AnalysisProvider
|
from .models import VideoAnalysis, DetectionResult, VisualAnalysis, ProcessingQueue, AnalysisProvider
|
||||||
|
from .config_manager import config_manager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
channel_layer = get_channel_layer()
|
channel_layer = get_channel_layer()
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True)
|
@shared_task(bind=True, queue='logo_detection')
|
||||||
def process_video_segment(self, stream_id, segment_path):
|
def analyze_logo_detection(self, stream_id, segment_path):
|
||||||
"""Process a video segment with AI analysis"""
|
"""Dedicated task for logo detection analysis"""
|
||||||
|
queue_item = None
|
||||||
|
try:
|
||||||
|
# Update queue status
|
||||||
|
queue_item = ProcessingQueue.objects.filter(
|
||||||
|
stream_id=stream_id,
|
||||||
|
segment_path=segment_path,
|
||||||
|
status='pending'
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if queue_item:
|
||||||
|
queue_item.status = 'processing'
|
||||||
|
queue_item.save()
|
||||||
|
|
||||||
|
# Check if logo detection is configured
|
||||||
|
if not config_manager.has_capability('logo_detection'):
|
||||||
|
logger.error("No logo detection provider configured")
|
||||||
|
if queue_item:
|
||||||
|
queue_item.status = 'failed'
|
||||||
|
queue_item.error_message = 'No logo detection provider configured'
|
||||||
|
queue_item.save()
|
||||||
|
return {"error": "No logo detection provider configured"}
|
||||||
|
|
||||||
|
# Initialize analysis engine with cached config
|
||||||
|
engine = AnalysisEngine()
|
||||||
|
logo_config = config_manager.get_provider_config('logo_detection')
|
||||||
|
engine.configure_providers({'logo_detection': logo_config})
|
||||||
|
|
||||||
|
# Extract and analyze frame
|
||||||
|
frame = engine.extract_frame_from_segment(segment_path)
|
||||||
|
if not frame:
|
||||||
|
logger.error(f"Failed to extract frame from {segment_path}")
|
||||||
|
if queue_item:
|
||||||
|
queue_item.status = 'failed'
|
||||||
|
queue_item.error_message = 'Failed to extract frame from segment'
|
||||||
|
queue_item.save()
|
||||||
|
return {"error": "Failed to extract frame"}
|
||||||
|
|
||||||
|
# Analyze for logos only
|
||||||
|
analysis_results = engine.analyze_frame(frame, ['logo_detection'], confidence_threshold=0.3)
|
||||||
|
|
||||||
|
# Store results
|
||||||
|
provider_info = config_manager.get_provider_by_type(logo_config['provider_type'])
|
||||||
|
provider = AnalysisProvider.objects.get(id=provider_info['id'])
|
||||||
|
|
||||||
|
analysis = VideoAnalysis.objects.create(
|
||||||
|
stream_id=stream_id,
|
||||||
|
segment_path=segment_path,
|
||||||
|
provider=provider,
|
||||||
|
analysis_type='logo_detection',
|
||||||
|
frame_timestamp=0.0,
|
||||||
|
confidence_threshold=0.3
|
||||||
|
)
|
||||||
|
|
||||||
|
detections = []
|
||||||
|
if 'logos' in analysis_results:
|
||||||
|
for logo in analysis_results['logos']:
|
||||||
|
detection = DetectionResult.objects.create(
|
||||||
|
analysis=analysis,
|
||||||
|
label=logo['label'],
|
||||||
|
confidence=logo['confidence'],
|
||||||
|
bbox_x=logo['bbox']['x'],
|
||||||
|
bbox_y=logo['bbox']['y'],
|
||||||
|
bbox_width=logo['bbox']['width'],
|
||||||
|
bbox_height=logo['bbox']['height'],
|
||||||
|
detection_type='logo'
|
||||||
|
)
|
||||||
|
detections.append(detection.to_dict())
|
||||||
|
|
||||||
|
# Send results via WebSocket if detections found
|
||||||
|
if detections:
|
||||||
|
async_to_sync(channel_layer.group_send)(
|
||||||
|
f"stream_{stream_id}",
|
||||||
|
{
|
||||||
|
"type": "analysis_update",
|
||||||
|
"analysis": analysis.to_dict()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update queue status
|
||||||
|
if queue_item:
|
||||||
|
queue_item.status = 'completed'
|
||||||
|
queue_item.save()
|
||||||
|
|
||||||
|
if detections:
|
||||||
|
logger.info(f"Logo detection: {len(detections)} detections found")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Logo detection: no detections found")
|
||||||
|
return {"detections": len(detections), "analysis_id": str(analysis.id)}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Logo detection failed for {segment_path}: {e}")
|
||||||
|
if queue_item:
|
||||||
|
queue_item.status = 'failed'
|
||||||
|
queue_item.error_message = str(e)
|
||||||
|
queue_item.save()
|
||||||
|
raise self.retry(countdown=60, max_retries=3)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(bind=True, queue='visual_analysis')
|
||||||
|
def analyze_visual_properties(self, stream_id, segment_path):
|
||||||
|
"""Dedicated task for visual property analysis"""
|
||||||
|
queue_item = None
|
||||||
try:
|
try:
|
||||||
# Update queue status
|
# Update queue status
|
||||||
queue_item = ProcessingQueue.objects.filter(
|
queue_item = ProcessingQueue.objects.filter(
|
||||||
@@ -27,138 +130,97 @@ def process_video_segment(self, stream_id, segment_path):
|
|||||||
# Initialize analysis engine
|
# Initialize analysis engine
|
||||||
engine = AnalysisEngine()
|
engine = AnalysisEngine()
|
||||||
|
|
||||||
# Debug: Check all providers
|
# Extract and analyze frame
|
||||||
all_providers = AnalysisProvider.objects.all()
|
frame = engine.extract_frame_from_segment(segment_path)
|
||||||
logger.info(f"Found {all_providers.count()} total providers:")
|
if not frame:
|
||||||
for p in all_providers:
|
logger.error(f"Failed to extract frame from {segment_path}")
|
||||||
logger.info(f" - {p.name}: {p.provider_type} (active: {p.active})")
|
|
||||||
|
|
||||||
# Get logo detection provider
|
|
||||||
logo_provider = AnalysisProvider.objects.filter(
|
|
||||||
provider_type='local_clip'
|
|
||||||
).first()
|
|
||||||
|
|
||||||
if not logo_provider:
|
|
||||||
logger.error("No CLIP provider found in database at all!")
|
|
||||||
if queue_item:
|
if queue_item:
|
||||||
queue_item.status = 'failed'
|
queue_item.status = 'failed'
|
||||||
queue_item.error_message = 'No CLIP provider in database'
|
queue_item.error_message = 'Failed to extract frame from segment'
|
||||||
queue_item.save()
|
queue_item.save()
|
||||||
return {"error": "No CLIP provider in database"}
|
return {"error": "Failed to extract frame"}
|
||||||
|
|
||||||
logger.info(f"Found CLIP provider: {logo_provider.name} (active: {logo_provider.active})")
|
# Analyze visual properties (always available locally)
|
||||||
|
analysis_results = engine.analyze_frame(frame, ['visual_analysis'])
|
||||||
|
|
||||||
if not logo_provider.active:
|
# Store results (no provider needed for local visual analysis)
|
||||||
logo_provider.active = True
|
analysis = VideoAnalysis.objects.create(
|
||||||
logo_provider.save()
|
stream_id=stream_id,
|
||||||
logger.info(f"Auto-activated CLIP provider: {logo_provider.name}")
|
segment_path=segment_path,
|
||||||
|
provider=None, # Local analysis
|
||||||
|
analysis_type='visual_analysis',
|
||||||
|
frame_timestamp=0.0,
|
||||||
|
confidence_threshold=0.0
|
||||||
|
)
|
||||||
|
|
||||||
if logo_provider:
|
# Store visual analysis
|
||||||
# Configure engine with logo detection
|
if 'visual' in analysis_results:
|
||||||
config = {
|
VisualAnalysis.objects.create(
|
||||||
'logo_detection': {
|
analysis=analysis,
|
||||||
'provider_type': 'local_clip',
|
dominant_colors=analysis_results['visual']['dominant_colors'],
|
||||||
'model_identifier': logo_provider.model_identifier
|
brightness_level=analysis_results['visual']['brightness_level'],
|
||||||
}
|
contrast_level=analysis_results['visual']['contrast_level'],
|
||||||
|
saturation_level=analysis_results['visual']['saturation_level']
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send results via WebSocket
|
||||||
|
async_to_sync(channel_layer.group_send)(
|
||||||
|
f"stream_{stream_id}",
|
||||||
|
{
|
||||||
|
"type": "analysis_update",
|
||||||
|
"analysis": analysis.to_dict()
|
||||||
}
|
}
|
||||||
logger.info(f"Configuring engine with config: {config}")
|
)
|
||||||
engine.configure_providers(config)
|
|
||||||
logger.info("Engine configuration completed")
|
|
||||||
|
|
||||||
# Extract frame from segment
|
# Update queue status
|
||||||
logger.info(f"Extracting frame from: {segment_path}")
|
|
||||||
frame = engine.extract_frame_from_segment(segment_path)
|
|
||||||
if frame:
|
|
||||||
logger.info(f"Frame extracted successfully, size: {frame.size}")
|
|
||||||
# Analyze frame for logos
|
|
||||||
logger.info("Starting frame analysis...")
|
|
||||||
analysis_results = engine.analyze_frame(
|
|
||||||
frame,
|
|
||||||
['logo_detection', 'visual_analysis'],
|
|
||||||
confidence_threshold=0.3
|
|
||||||
)
|
|
||||||
logger.info(f"Analysis results: {analysis_results}")
|
|
||||||
|
|
||||||
# Store analysis results
|
|
||||||
analysis = VideoAnalysis.objects.create(
|
|
||||||
stream_id=stream_id,
|
|
||||||
segment_path=segment_path,
|
|
||||||
provider=logo_provider,
|
|
||||||
analysis_type='logo_detection',
|
|
||||||
frame_timestamp=0.0,
|
|
||||||
confidence_threshold=0.3
|
|
||||||
)
|
|
||||||
|
|
||||||
# Store detections
|
|
||||||
detections = []
|
|
||||||
if 'logos' in analysis_results:
|
|
||||||
for logo in analysis_results['logos']:
|
|
||||||
detection = DetectionResult.objects.create(
|
|
||||||
analysis=analysis,
|
|
||||||
label=logo['label'],
|
|
||||||
confidence=logo['confidence'],
|
|
||||||
bbox_x=logo['bbox']['x'],
|
|
||||||
bbox_y=logo['bbox']['y'],
|
|
||||||
bbox_width=logo['bbox']['width'],
|
|
||||||
bbox_height=logo['bbox']['height'],
|
|
||||||
detection_type='logo'
|
|
||||||
)
|
|
||||||
detections.append(detection.to_dict())
|
|
||||||
|
|
||||||
# Store visual analysis
|
|
||||||
if 'visual' in analysis_results:
|
|
||||||
VisualAnalysis.objects.create(
|
|
||||||
analysis=analysis,
|
|
||||||
dominant_colors=analysis_results['visual']['dominant_colors'],
|
|
||||||
brightness_level=analysis_results['visual']['brightness_level'],
|
|
||||||
contrast_level=analysis_results['visual']['contrast_level'],
|
|
||||||
saturation_level=analysis_results['visual']['saturation_level']
|
|
||||||
)
|
|
||||||
|
|
||||||
# Send results via WebSocket
|
|
||||||
if detections:
|
|
||||||
async_to_sync(channel_layer.group_send)(
|
|
||||||
f"stream_{stream_id}",
|
|
||||||
{
|
|
||||||
"type": "analysis_update",
|
|
||||||
"analysis": analysis.to_dict()
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update queue status
|
|
||||||
if queue_item:
|
|
||||||
queue_item.status = 'completed'
|
|
||||||
queue_item.save()
|
|
||||||
|
|
||||||
logger.info(f"Processed segment {segment_path}: {len(detections)} detections")
|
|
||||||
return {"detections": len(detections), "analysis_id": str(analysis.id)}
|
|
||||||
else:
|
|
||||||
logger.error("Failed to extract frame from segment")
|
|
||||||
if queue_item:
|
|
||||||
queue_item.status = 'failed'
|
|
||||||
queue_item.error_message = 'Failed to extract frame from video segment'
|
|
||||||
queue_item.save()
|
|
||||||
return {"error": "Failed to extract frame from segment"}
|
|
||||||
|
|
||||||
# No provider configured
|
|
||||||
if queue_item:
|
if queue_item:
|
||||||
queue_item.status = 'failed'
|
queue_item.status = 'completed'
|
||||||
queue_item.error_message = 'No active AI provider configured'
|
|
||||||
queue_item.save()
|
queue_item.save()
|
||||||
|
|
||||||
return {"error": "No AI provider configured"}
|
logger.debug(f"Visual analysis completed for {segment_path}")
|
||||||
|
return {"analysis_id": str(analysis.id)}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing segment {segment_path}: {e}")
|
logger.error(f"Visual analysis failed for {segment_path}: {e}")
|
||||||
|
|
||||||
if queue_item:
|
if queue_item:
|
||||||
queue_item.status = 'failed'
|
queue_item.status = 'failed'
|
||||||
queue_item.error_message = str(e)
|
queue_item.error_message = str(e)
|
||||||
queue_item.save()
|
queue_item.save()
|
||||||
|
|
||||||
raise self.retry(countdown=60, max_retries=3)
|
raise self.retry(countdown=60, max_retries=3)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(bind=True)
|
||||||
|
def process_video_segment(self, stream_id, segment_path):
|
||||||
|
"""Main task that dispatches to specialized analysis tasks"""
|
||||||
|
try:
|
||||||
|
# Dispatch to specialized queues based on available capabilities
|
||||||
|
active_capabilities = config_manager.get_active_capabilities()
|
||||||
|
|
||||||
|
if 'logo_detection' in active_capabilities:
|
||||||
|
analyze_logo_detection.delay(stream_id, segment_path)
|
||||||
|
|
||||||
|
# Visual analysis disabled for performance - only logo detection
|
||||||
|
# analyze_visual_properties.delay(stream_id, segment_path)
|
||||||
|
|
||||||
|
return {"dispatched": True, "capabilities": active_capabilities}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to dispatch analysis for {segment_path}: {e}")
|
||||||
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(queue='config_management')
|
||||||
|
def reload_analysis_config():
|
||||||
|
"""Task to reload analysis provider configuration"""
|
||||||
|
try:
|
||||||
|
config_manager.reload_config()
|
||||||
|
logger.info("Analysis configuration reloaded successfully")
|
||||||
|
return {"status": "success", "capabilities": config_manager.get_active_capabilities()}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to reload analysis configuration: {e}")
|
||||||
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def analyze_frame_task(stream_id, segment_path, frame_timestamp=0.0):
|
def analyze_frame_task(stream_id, segment_path, frame_timestamp=0.0):
|
||||||
"""Analyze a single frame from video segment"""
|
"""Analyze a single frame from video segment"""
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
from celery.signals import worker_ready
|
||||||
|
import django
|
||||||
|
import logging
|
||||||
|
|
||||||
# Set the default Django settings module for the 'celery' program
|
# Set the default Django settings module for the 'celery' program
|
||||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
|
||||||
@@ -13,6 +16,22 @@ app.config_from_object('django.conf:settings', namespace='CELERY')
|
|||||||
# Load task modules from all registered Django apps.
|
# Load task modules from all registered Django apps.
|
||||||
app.autodiscover_tasks()
|
app.autodiscover_tasks()
|
||||||
|
|
||||||
|
@worker_ready.connect
|
||||||
|
def worker_ready_handler(sender=None, **kwargs):
|
||||||
|
"""Initialize configuration when worker starts"""
|
||||||
|
import django
|
||||||
|
django.setup()
|
||||||
|
|
||||||
|
from ai_processing.config_manager import config_manager
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
config_manager.reload_config()
|
||||||
|
capabilities = config_manager.get_active_capabilities()
|
||||||
|
logger.info(f"Worker ready - loaded capabilities: {capabilities}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to initialize worker configuration: {e}")
|
||||||
|
|
||||||
@app.task(bind=True, ignore_result=True)
|
@app.task(bind=True, ignore_result=True)
|
||||||
def debug_task(self):
|
def debug_task(self):
|
||||||
print(f'Request: {self.request!r}')
|
print(f'Request: {self.request!r}')
|
||||||
@@ -95,6 +95,30 @@ CELERY_TASK_SERIALIZER = 'json'
|
|||||||
CELERY_RESULT_SERIALIZER = 'json'
|
CELERY_RESULT_SERIALIZER = 'json'
|
||||||
CELERY_TIMEZONE = 'UTC'
|
CELERY_TIMEZONE = 'UTC'
|
||||||
|
|
||||||
|
# Celery queue routing
|
||||||
|
CELERY_TASK_ROUTES = {
|
||||||
|
'ai_processing.tasks.analyze_logo_detection': {'queue': 'logo_detection'},
|
||||||
|
'ai_processing.tasks.analyze_visual_properties': {'queue': 'visual_analysis'},
|
||||||
|
'ai_processing.tasks.reload_analysis_config': {'queue': 'config_management'},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Define queues with different priorities
|
||||||
|
CELERY_TASK_DEFAULT_QUEUE = 'default'
|
||||||
|
CELERY_TASK_QUEUES = {
|
||||||
|
'default': {
|
||||||
|
'routing_key': 'default',
|
||||||
|
},
|
||||||
|
'logo_detection': {
|
||||||
|
'routing_key': 'logo_detection',
|
||||||
|
},
|
||||||
|
'visual_analysis': {
|
||||||
|
'routing_key': 'visual_analysis',
|
||||||
|
},
|
||||||
|
'config_management': {
|
||||||
|
'routing_key': 'config_management',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
# Database
|
# Database
|
||||||
# https://docs.djangoproject.com/en/5.2/ref/settings/#databases
|
# https://docs.djangoproject.com/en/5.2/ref/settings/#databases
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ channels==4.0.0
|
|||||||
channels-redis==4.2.0
|
channels-redis==4.2.0
|
||||||
celery==5.3.4
|
celery==5.3.4
|
||||||
redis==5.0.1
|
redis==5.0.1
|
||||||
|
uvicorn==0.24.0
|
||||||
ffmpeg-python==0.2.0
|
ffmpeg-python==0.2.0
|
||||||
Pillow==10.0.1
|
Pillow==10.0.1
|
||||||
django-cors-headers==4.3.1
|
django-cors-headers==4.3.1
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
version: '3.8'
|
version: '3.8'
|
||||||
|
|
||||||
services:
|
services:
|
||||||
# Django Backend
|
# Django Backend with ASGI (WebSocket support)
|
||||||
backend:
|
backend:
|
||||||
build: ./backend
|
build: ./backend
|
||||||
ports:
|
ports:
|
||||||
@@ -18,7 +18,9 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
postgres:
|
postgres:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
command: python manage.py runserver 0.0.0.0:8000
|
redis:
|
||||||
|
condition: service_started
|
||||||
|
command: uvicorn media_analyzer.asgi:application --host 0.0.0.0 --port 8000 --reload
|
||||||
|
|
||||||
# Django Migrations (run manually)
|
# Django Migrations (run manually)
|
||||||
migrate:
|
migrate:
|
||||||
@@ -37,6 +39,63 @@ services:
|
|||||||
command: python manage.py migrate
|
command: python manage.py migrate
|
||||||
profiles: ["tools"] # Only runs when explicitly called
|
profiles: ["tools"] # Only runs when explicitly called
|
||||||
|
|
||||||
|
# Celery Worker - Logo Detection Queue
|
||||||
|
celery-logo:
|
||||||
|
build: ./backend
|
||||||
|
volumes:
|
||||||
|
- ./backend:/app
|
||||||
|
- ./media:/app/media
|
||||||
|
environment:
|
||||||
|
- DEBUG=1
|
||||||
|
- DB_HOST=postgres
|
||||||
|
- DB_NAME=media_analyzer
|
||||||
|
- DB_USER=media_user
|
||||||
|
- DB_PASSWORD=media_pass
|
||||||
|
depends_on:
|
||||||
|
postgres:
|
||||||
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_started
|
||||||
|
command: celery -A media_analyzer worker --loglevel=info --queues=logo_detection --hostname=logo-worker@%h
|
||||||
|
|
||||||
|
# Celery Worker - Visual Analysis Queue
|
||||||
|
celery-visual:
|
||||||
|
build: ./backend
|
||||||
|
volumes:
|
||||||
|
- ./backend:/app
|
||||||
|
- ./media:/app/media
|
||||||
|
environment:
|
||||||
|
- DEBUG=1
|
||||||
|
- DB_HOST=postgres
|
||||||
|
- DB_NAME=media_analyzer
|
||||||
|
- DB_USER=media_user
|
||||||
|
- DB_PASSWORD=media_pass
|
||||||
|
depends_on:
|
||||||
|
postgres:
|
||||||
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_started
|
||||||
|
command: celery -A media_analyzer worker --loglevel=info --queues=visual_analysis --hostname=visual-worker@%h
|
||||||
|
|
||||||
|
# Celery Worker - Default Queue
|
||||||
|
celery-default:
|
||||||
|
build: ./backend
|
||||||
|
volumes:
|
||||||
|
- ./backend:/app
|
||||||
|
- ./media:/app/media
|
||||||
|
environment:
|
||||||
|
- DEBUG=1
|
||||||
|
- DB_HOST=postgres
|
||||||
|
- DB_NAME=media_analyzer
|
||||||
|
- DB_USER=media_user
|
||||||
|
- DB_PASSWORD=media_pass
|
||||||
|
depends_on:
|
||||||
|
postgres:
|
||||||
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_started
|
||||||
|
command: celery -A media_analyzer worker --loglevel=info --queues=default,config_management --hostname=default-worker@%h
|
||||||
|
|
||||||
# # Angular Frontend
|
# # Angular Frontend
|
||||||
# frontend:
|
# frontend:
|
||||||
# build: ./frontend
|
# build: ./frontend
|
||||||
|
|||||||
@@ -1,29 +1,31 @@
|
|||||||
<div class="analysis-panel">
|
<div class="analysis-panel">
|
||||||
<h3>Live Analysis Results</h3>
|
<h3>Live Analysis Results</h3>
|
||||||
|
|
||||||
@if (currentVisual) {
|
<div class="visual-analysis" [class.disabled]="!currentVisual">
|
||||||
<div class="visual-analysis">
|
<h4>Visual Properties <span class="status">({{ currentVisual ? 'Active' : 'Disabled' }})</span></h4>
|
||||||
<h4>Visual Properties</h4>
|
<div class="property">
|
||||||
<div class="property">
|
<span>Brightness:</span>
|
||||||
<span>Brightness:</span>
|
<div class="brightness-bar">
|
||||||
<div class="brightness-bar">
|
<div class="brightness-fill" [style.width.%]="currentVisual ? (currentVisual.brightness_level * 100) : 0"></div>
|
||||||
<div class="brightness-fill" [style.width.%]="currentVisual.brightness_level * 100"></div>
|
|
||||||
</div>
|
|
||||||
<span>{{(currentVisual.brightness_level * 100) | number:'1.0-0'}}%</span>
|
|
||||||
</div>
|
</div>
|
||||||
|
<span>{{ currentVisual ? ((currentVisual.brightness_level * 100) | number:'1.0-0') : '--' }}%</span>
|
||||||
@if (currentVisual.dominant_colors && currentVisual.dominant_colors.length > 0) {
|
|
||||||
<div class="property">
|
|
||||||
<span>Colors:</span>
|
|
||||||
<div class="color-palette">
|
|
||||||
@for (color of currentVisual.dominant_colors; track $index) {
|
|
||||||
<div class="color-swatch" [style.background]="getColorStyle([color])"></div>
|
|
||||||
}
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
}
|
|
||||||
</div>
|
</div>
|
||||||
}
|
|
||||||
|
<div class="property">
|
||||||
|
<span>Colors:</span>
|
||||||
|
<div class="color-palette">
|
||||||
|
@if (currentVisual && currentVisual.dominant_colors && currentVisual.dominant_colors.length > 0) {
|
||||||
|
@for (color of currentVisual.dominant_colors; track $index) {
|
||||||
|
<div class="color-swatch" [style.background]="getColorStyle([color])"></div>
|
||||||
|
}
|
||||||
|
} @else {
|
||||||
|
<div class="color-swatch disabled"></div>
|
||||||
|
<div class="color-swatch disabled"></div>
|
||||||
|
<div class="color-swatch disabled"></div>
|
||||||
|
}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
@if (getDetectionsByType('logo').length > 0) {
|
@if (getDetectionsByType('logo').length > 0) {
|
||||||
<div class="detection-section">
|
<div class="detection-section">
|
||||||
|
|||||||
@@ -19,6 +19,37 @@
|
|||||||
.visual-analysis {
|
.visual-analysis {
|
||||||
margin-bottom: 1.5rem;
|
margin-bottom: 1.5rem;
|
||||||
|
|
||||||
|
&.disabled {
|
||||||
|
opacity: 0.5;
|
||||||
|
|
||||||
|
h4 {
|
||||||
|
color: #999;
|
||||||
|
|
||||||
|
.status {
|
||||||
|
font-size: 0.8rem;
|
||||||
|
font-weight: normal;
|
||||||
|
color: #666;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
.property span {
|
||||||
|
color: #999;
|
||||||
|
}
|
||||||
|
|
||||||
|
.brightness-bar {
|
||||||
|
background: #f0f0f0;
|
||||||
|
|
||||||
|
.brightness-fill {
|
||||||
|
background: #e0e0e0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
.color-swatch.disabled {
|
||||||
|
background: #f0f0f0 !important;
|
||||||
|
border-color: #ddd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
.property {
|
.property {
|
||||||
display: flex;
|
display: flex;
|
||||||
align-items: center;
|
align-items: center;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { Component, ElementRef, Input, ViewChild, AfterViewInit, OnDestroy, OnChanges } from '@angular/core';
|
import { Component, ElementRef, Input, ViewChild, AfterViewInit, OnDestroy, OnChanges, SimpleChanges } from '@angular/core';
|
||||||
import Hls from 'hls.js';
|
import Hls from 'hls.js';
|
||||||
import { Analysis, DetectionResult } from '../../models/analysis';
|
import { Analysis, DetectionResult } from '../../models/analysis';
|
||||||
|
|
||||||
@@ -26,13 +26,14 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ngOnChanges() {
|
ngOnChanges(changes: SimpleChanges) {
|
||||||
if (this.streamUrl && this.videoElement) {
|
// Only reload stream if the streamUrl actually changed
|
||||||
|
if (changes['streamUrl'] && this.streamUrl && this.videoElement) {
|
||||||
this.loadStream(this.streamUrl);
|
this.loadStream(this.streamUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Redraw detections when they change
|
// Redraw detections when they change
|
||||||
if (this.ctx) {
|
if (changes['detections'] && this.ctx) {
|
||||||
this.drawDetections();
|
this.drawDetections();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
24
progress.md
24
progress.md
@@ -24,22 +24,22 @@
|
|||||||
│ │ │ [x] Basic error handling │ [ ] CDN integration ready │
|
│ │ │ [x] Basic error handling │ [ ] CDN integration ready │
|
||||||
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
|
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
|
||||||
│ 🤖 AI Analysis │ Backend: │ Backend: │ Backend: │
|
│ 🤖 AI Analysis │ Backend: │ Backend: │ Backend: │
|
||||||
│ │ [x] Django ai_processing app │ [ ] Frame extraction from HLS │ [ ] Distributed processing work │
|
│ │ [x] Django ai_processing app │ [x] Frame extraction from HLS │ [ ] Distributed processing work │
|
||||||
│ │ [ ] Video analysis models │ [ ] YOLO object detection │ [ ] WebSocket analysis streaming │
|
│ │ [x] Video analysis models │ [x] CLIP logo detection │ [x] WebSocket analysis streaming │
|
||||||
│ │ [ ] YOLO/CLIP model loading │ [ ] Analysis results storage │ [ ] Redis result caching │
|
│ │ [x] CLIP model loading │ [x] Analysis results storage │ [x] Redis result caching │
|
||||||
│ │ [x] Processing strategy pattern │ [ ] Results API endpoint │ [ ] Batch vs real-time modes │
|
│ │ [x] Processing strategy pattern │ [x] Results API endpoint │ [x] Batch vs real-time modes │
|
||||||
│ │ │ │ │
|
│ │ │ │ │
|
||||||
│ │ Frontend: │ Frontend: │ Frontend: │
|
│ │ Frontend: │ Frontend: │ Frontend: │
|
||||||
│ │ [x] Analysis display components │ [ ] Detection overlay on video │ [ ] Live analysis updates │
|
│ │ [x] Analysis display components │ [x] Detection overlay on video │ [x] Live analysis updates │
|
||||||
│ │ [ ] Results visualization svc │ [ ] Results panel display │ [ ] Analysis history timeline │
|
│ │ [x] Results visualization svc │ [x] Results panel display │ [ ] Analysis history timeline │
|
||||||
│ │ [ ] Detection overlay system │ [ ] Analysis trigger controls │ [ ] Performance dashboards │
|
│ │ [x] Detection overlay system │ [x] Analysis trigger controls │ [ ] Performance dashboards │
|
||||||
│ │ │ [ ] Object list and filtering │ [ ] Multi-stream analysis view │
|
│ │ │ [x] Object list and filtering │ [ ] Multi-stream analysis view │
|
||||||
│ │ │ │ │
|
│ │ │ │ │
|
||||||
│ │ Data/Config: │ Data/Config: │ Data/Config: │
|
│ │ Data/Config: │ Data/Config: │ Data/Config: │
|
||||||
│ │ [ ] Analysis results model │ [ ] Local model storage │ [ ] K8s GPU node pools │
|
│ │ [x] Analysis results model │ [x] Local model storage │ [ ] K8s GPU node pools │
|
||||||
│ │ [ ] Object detection schema │ [ ] Processing queue setup │ [ ] Analysis result streaming │
|
│ │ [x] Object detection schema │ [x] Processing queue setup │ [x] Analysis result streaming │
|
||||||
│ │ [ ] AI model configurations │ [ ] Result data persistence │ [ ] Model versioning system │
|
│ │ [x] AI model configurations │ [x] Result data persistence │ [ ] Model versioning system │
|
||||||
│ │ [ ] Celery task definitions │ [ ] Basic performance metrics │ [ ] Cloud storage integration │
|
│ │ [x] Celery task definitions │ [x] Basic performance metrics │ [ ] Cloud storage integration │
|
||||||
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
|
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
|
||||||
│ 🎨 Effects Pipeline │ Backend: │ Backend: │ Backend: │
|
│ 🎨 Effects Pipeline │ Backend: │ Backend: │ Backend: │
|
||||||
│ │ [x] Django effects app │ [ ] Basic shader pipeline │ [ ] GPU cluster scheduling │
|
│ │ [x] Django effects app │ [ ] Basic shader pipeline │ [ ] GPU cluster scheduling │
|
||||||
|
|||||||
Reference in New Issue
Block a user