add AI analysis backend and logo detection

This commit is contained in:
buenosairesam
2025-08-25 03:59:20 -03:00
parent c21cc32727
commit 54b401413f
42 changed files with 2812 additions and 23 deletions

View File

@@ -407,3 +407,101 @@ This approach gives you:
- **Easy CI/CD** since K8s manifests are tested locally - **Easy CI/CD** since K8s manifests are tested locally
The planning already supports this perfectly - you develop fast locally and test orchestration when needed! The planning already supports this perfectly - you develop fast locally and test orchestration when needed!
CURRENT STEP
now that I have a basic stream pipeline a need to add an AI feature (something simple) before stepping everything to something that scales. I was thinking on detecting my extend hand, but maybe some more bussines oriented feature would work better (read the very first message with the JD in spanish to think about this)
Edit
Looking back at the job description, they mention analyzing "millones de emisiones en medios y redes sociales" for communication and advertising optimization. For a business-oriented AI feature that aligns with Seenka's focus, I'd suggest:
🎯 Brand/Logo Detection
Business Value: Identify brands and logos in video streams (like TV broadcasts, social media content)
Detect when competitors appear in content
Monitor brand visibility and placement
Track advertising effectiveness
Technical Implementation:
Use CLIP for general brand recognition
Train on common logos/brand imagery
Output: Brand name, confidence, screen position, duration
📺 Content Classification
Business Value: Categorize video content for media monitoring
Detect content types: "news", "sports", "entertainment", "advertising"
Identify show segments vs commercial breaks
Monitor content themes and topics
Technical Implementation:
CLIP scene classification with predefined categories
Frame sampling every few seconds
Output: Content category, confidence, timestamp
🗣️ Speaker Detection
Business Value: Media monitoring and audience analysis
Count people on screen (audience size estimation)
Detect when someone is speaking/presenting
Monitor speaker time and engagement
Technical Implementation:
YOLO for person detection
Basic pose estimation to detect "presentation pose"
Output: Person count, speaking activity, screen position
My Recommendation: Content Classification
Why it's perfect for your demo:
Business relevant: Directly matches Seenka's media analysis focus
Technically achievable: CLIP can classify scenes out-of-the-box
Visually impressive: Live categorization overlay on your OBS stream
Extensible: Easy to add more categories or confidence thresholds
Demo scenario: Stream different content types with OBS (news clips, sports, entertainment) and watch the AI classify them in real-time.
Which direction appeals to you most? The hand detection is cool for personal interaction, but content classification would really showcase understanding of their business domain.
Im using streams of myself (I have a decent setup) so for now the input will be just that but I think i can be fun if I display logos with my phone or something like that
Edit
Perfect! Logo detection with your phone is brilliant - it's both business-relevant (brand monitoring) and interactive demo-friendly. You can hold up logos on your phone screen or physical items and watch the AI detect them in real-time.
🏷️ Logo Detection Implementation
Simple Approach (Weekend-friendly):
CLIP-based Brand Recognition:
python
# Use CLIP's zero-shot classification
brands = ["Apple", "Google", "Nike", "Coca-Cola", "McDonald's", "Amazon"]
# Hold up logos on phone/laptop and classify scenes
Demo Flow:
You stream yourself with OBS
Hold up different brand logos on your phone/laptop screen
AI detects and labels them with confidence scores
Live overlay shows: "Apple Logo detected (87% confidence)"
Enhanced Approach:
Combine YOLO + CLIP:
YOLO: Detect objects in scene (phone, laptop, clothing, etc.)
CLIP: Classify detected regions for brand/logo content
Output: "iPhone detected", "Nike logo on shirt", "Starbucks cup visible"
Fun Demo Scenarios:
Tech Logos: Show Apple, Google, Microsoft logos on your devices
Fashion Brands: Wear branded clothing/accessories
Food/Drink: Hold up branded items from your desk
Mixed Reality: Point phone camera at branded items around your room
Business Value Demo:
Brand Monitoring: "Track when competitors appear in streams"
Advertising Analysis: "Measure brand visibility duration and positioning"
Content Compliance: "Detect unauthorized brand usage"
Implementation in Your Matrix:
This fits perfectly in AI Analysis → Local Single Server:
Frame extraction from HLS ✓
CLIP logo classification (instead of YOLO objects)
Analysis results storage
Results API endpoint
Want to start with basic CLIP brand classification, or go straight for the object detection + brand recognition combo?

View File

@@ -0,0 +1,166 @@
video/image analysis discussion
> that looks better but still i think (i may be wrong) that gcp provides a comprehensive video analysis, while CLIP just cover part of that, its good to this abstract class for adapters
but I think that maybe we shoudl have the same for video analisys DONT JUST DO IT help me thing this through
 Interrupted by user
> that looks better but still i think (i may be wrong) that gcp provides a comprehensive video analysis, while CLIP just cover part of that, its good to this abstract class for adapters
but I think that maybe we shoudl have the same for video analisys DONT JUST DO IT help me think this through
✻ Thinking…
The user is asking me to think through the design before implementing. They're raising a good point about GCP providing comprehensive video analysis while CLIP only covers part of it.
Let me think about this:
1. GCP Video Intelligence API vs GCP Vision API:
- Vision API: Image analysis (objects, logos, text, faces)
- Video Intelligence API: Video-specific features (shot detection, explicit content, celebrity recognition, activity recognition, etc.)
2. CLIP limitations:
- CLIP is primarily for image-text matching
- Doesn't do bounding box detection natively
- Good for scene classification and brand/logo recognition via text prompts
- Not great for precise object detection with coordinates
3. YOLO capabilities:
- Excellent object detection with bounding boxes
- Real-time performance
- Limited to predefined object classes
- No logo/brand recognition out of the box
4. The user is suggesting we might need separate abstractions for:
- Image analysis (frame-by-frame)
- Video analysis (temporal, motion, scene transitions, etc.)
This makes sense because:
- Image analysis: Static frame analysis (objects, logos, text)
- Video analysis: Temporal features (motion tracking, scene changes, activity detection)
The current adapter pattern I was proposing mixes both concerns. Let me think about a better design:
ImageAnalysisAdapter:
- detect_objects()
- detect_logos()
- detect_text()
- analyze_visual_properties()
VideoAnalysisAdapter:
- detect_motion()
- analyze_scene_transitions()
- track_objects_over_time()
- detect_activities()
- analyze_audio() (if applicable)
This would allow:
- GCP Vision API -> ImageAnalysisAdapter
- GCP Video Intelligence API -> VideoAnalysisAdapter
- Local CLIP -> ImageAnalysisAdapter (partial)
- Local YOLO -> ImageAnalysisAdapter
- Custom motion detection -> VideoAnalysisAdapter
I recognize the importance of carefully designing the system architecture before jumping into implementation. Breaking down the analysis into specialized adapters provides flexibility
and allows for modular integration of different computer vision technologies.
● You're absolutely right to pause here! Let me think through this design challenge:
Current Issue: I'm mixing two different types of analysis:
1. Image Analysis (frame-by-frame):
- Objects, logos, text detection with bounding boxes
- Visual properties (colors, brightness)
- Works on single frames
2. Video Analysis (temporal):
- Motion tracking, scene transitions
- Activity detection over time
- Audio analysis
- Shot boundary detection
Provider Capabilities Mismatch:
- GCP Vision API: Image analysis only
- GCP Video Intelligence API: Comprehensive video analysis (motion, activities, explicit content, etc.)
- Local CLIP: Image classification/matching, no bboxes
- Local YOLO: Object detection with bboxes, no video temporal features
Design Question: Should we have separate adapter hierarchies?
# Option 1: Separate adapters
ImageAnalysisAdapter -> GCPVisionAdapter, CLIPAdapter, YOLOAdapter
VideoAnalysisAdapter -> GCPVideoIntelligenceAdapter, LocalMotionAdapter
# Option 2: Unified but capability-aware
AnalysisAdapter -> Different methods optional based on provider capabilities
What's your thinking? Separate adapters for image vs video analysis, or one flexible adapter that declares its capabilities?
✻ Thinking…
The user is asking me to help them think through this design decision rather than just implementing something. This is a good design discussion about how to handle the different types
of analysis capabilities that various providers offer.
Let me think about the trade-offs:
Option 1: Separate Adapters
Pros:
- Clear separation of concerns
- Each adapter type has focused responsibility
- Easy to understand which provider does what
- Can optimize for specific analysis types
Cons:
- More complex factory logic
- Might need coordination between adapters for the same frame
- Could lead to code duplication
Option 2: Unified Adapter with Capabilities
Pros:
- Single interface to manage
- Provider declares what it can do
- Easier to coordinate multi-analysis on same frame
- Simpler factory pattern
Cons:
- Interface might become bloated
- Less clear separation between image and video analysis
- Might encourage putting unrelated functionality together
I think the user is leaning toward separate adapters based on their comment about GCP providing "comprehensive video analysis" vs CLIP covering "part of that". They seem to recognize
that these are fundamentally different types of analysis that shouldn't be forced into the same abstraction.
The streaming app they referenced uses different adapters for different source types (RTMP vs File), which is a good parallel - those are different input types that need different
handling, just like image analysis vs video analysis are different analysis types that need different handling.
I like having the unified adapter (at this point, adapter is not the proper term as this would be "main abstraction"). For what you reasoned I think is good to have a clear separation between video and image but I think that we should focus on what this "main abstraction" would do, and use the adapter classes to independently serve those "main abstractions" like object dectection, text detection, logo detection (that have some implementation limitations like having to clasify functions basde on if it's a video thing or an image thing, and later an audio thing)
Brilliant insight! You're right - let's think about the main abstraction as what we want to analyze, and adapters as how to implement each specific capability.
Main Abstractions (Analysis Types):
- Object Detection
- Logo Detection
- Text Detection
- Motion Analysis
- Audio Analysis (future)
Adapter Layer (Implementation Methods):
- Per capability, not per provider
- Each adapter handles one specific analysis type
- Providers can implement multiple adapters
Design Structure:
# Main Analysis Engine
VideoAnalysisEngine
└── uses ObjectDetectionAdapter, LogoDetectionAdapter, etc.
# Capability-specific adapters
ObjectDetectionAdapter -> YOLOAdapter, GCPObjectAdapter
LogoDetectionAdapter -> CLIPLogoAdapter, GCPLogoAdapter
TextDetectionAdapter -> GCPTextAdapter, TesseractAdapter
MotionAnalysisAdapter -> OpenCVMotionAdapter, GCPVideoAdapter
Benefits:
- Each adapter focuses on one thing well
- Mix and match providers per capability
- Easy to add new providers for specific features
- Clear separation of image vs video vs audio processing
Should we design the main AnalysisEngine that orchestrates these capability-specific adapters?
> yes

View File

@@ -36,3 +36,60 @@ nginx
But for development, having CORS in both services is correct because they serve different content types to the browser. But for development, having CORS in both services is correct because they serve different content types to the browser.
on AI feature decide a model design, that Im not sure what actually matters
> can you explain why this model design?
● Looking at this model design, here's the reasoning:
VideoAnalysis - Central hub that tracks each analysis run with metadata like processing time, confidence thresholds, and frame timestamps. This allows performance monitoring and
result correlation.
ObjectDetection - YOLO results with bounding boxes. Multiple detections can belong to one analysis (one frame can have many objects).
SceneAnalysis - CLIP results for scene understanding, text detection, and embeddings. One-to-one with VideoAnalysis since CLIP analyzes the whole frame context.
ProcessingQueue - Decouples video processing from real-time streaming. Allows configurable analysis types per segment (you might want only YOLO for some streams, CLIP+YOLO for
others).
Key design decisions:
- Separation of concerns: YOLO (objects) vs CLIP (scenes/text) have different data structures
- Flexible processing: Queue supports different AI model combinations per segment
- Performance tracking: Processing times and confidence thresholds for optimization
- Scalability: UUIDs and proper indexing for distributed processing
- Real-time ready: Frame timestamps enable overlay synchronization
The analysis_types JSON field in ProcessingQueue is particularly important - it lets you configure which AI models to run per stream (e.g., ['yolo', 'clip_scene'] for object detection
+ scene analysis).
still hardcoding stuff???
class TextDetectionAdapterFactory(AdapterFactory): │ │
"""Factory for text detection adapters""" │ │
@staticmethod │ │
def create(provider_config): │ │
provider_type = provider_config.get('provider_type') │ │
│ │
if provider_type == 'gcp_vision': │ │
return GCPTextDetectionAdapter() │ │
elif provider_type == 'local_tesseract': │ │
return TesseractTextDetectionAdapter() │ │
else: │ │
raise ValueError(f"Unknown text detection provider: {provider_type}")
add aliases to avoid relative imports
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ ../frontend/src/app/components/stream-viewer/stream-viewer.component.ts │ │
│ │ │ │
│ │ 1 import { Component, ElementRef, Input, ViewChild, AfterViewInit, OnDestroy, OnChanges } from '@angular/core'; │ │
│ │ 2 import Hls from 'hls.js'; │ │
│ │ 3 + import { Analysis, DetectionResult } from '../../models/analysis'; │ │
│ │ 4 │ │
│ │ 5 @Component({ │ │
│ │ 6 selector: 'app-stream-viewer', │ │
│ ╰───────────────────────────────────────────────────────────

View File

@@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
class DetectionAdapter(ABC):
"""Base class for detection adapters (image-based analysis)"""
@abstractmethod
def detect(self, image, confidence_threshold=0.5):
"""
Detect features in image
Returns: List of detection results
[{'label': str, 'confidence': float, 'bbox': {'x': float, 'y': float, 'width': float, 'height': float}}]
"""
pass
class VideoAnalysisAdapter(ABC):
"""Base class for video analysis adapters (temporal analysis)"""
@abstractmethod
def analyze(self, video_path, **kwargs):
"""
Analyze video for temporal features
Returns: Dict of analysis results
"""
pass
class AdapterFactory(ABC):
"""Base factory for creating adapters"""
@staticmethod
@abstractmethod
def create(provider_config):
"""Create adapter instance from provider configuration"""
pass

View File

@@ -0,0 +1,131 @@
import logging
from .base import DetectionAdapter, AdapterFactory
import io
logger = logging.getLogger(__name__)
class GCPLogoDetectionAdapter(DetectionAdapter):
"""Google Cloud Vision logo detection"""
def __init__(self):
from google.cloud import vision
self.client = vision.ImageAnnotatorClient()
def detect(self, image, confidence_threshold=0.5):
try:
# Convert image to bytes
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='JPEG')
image_bytes = img_byte_arr.getvalue()
# GCP Vision API call
from google.cloud import vision
vision_image = vision.Image(content=image_bytes)
response = self.client.logo_detection(image=vision_image)
results = []
for logo in response.logo_annotations:
if logo.score >= confidence_threshold:
# Convert pixel vertices to normalized bbox
vertices = logo.bounding_poly.vertices
x_coords = [v.x for v in vertices]
y_coords = [v.y for v in vertices]
# Normalize using image dimensions
width, height = image.size
bbox = {
'x': min(x_coords) / width,
'y': min(y_coords) / height,
'width': (max(x_coords) - min(x_coords)) / width,
'height': (max(y_coords) - min(y_coords)) / height
}
results.append({
'label': logo.description,
'confidence': logo.score,
'bbox': bbox
})
return results
except Exception as e:
logger.error(f"GCP logo detection error: {e}")
return []
class CLIPLogoDetectionAdapter(DetectionAdapter):
"""Local CLIP-based logo/brand detection"""
def __init__(self, model_identifier="openai/clip-vit-base-patch32"):
self.model_identifier = model_identifier
self.model = None
self.processor = None
def _load_model(self):
if not self.model:
from transformers import CLIPProcessor, CLIPModel
self.model = CLIPModel.from_pretrained(self.model_identifier)
self.processor = CLIPProcessor.from_pretrained(self.model_identifier)
def detect(self, image, confidence_threshold=0.5):
try:
self._load_model()
from ..models import Brand
# Get active brands from database
active_brands = Brand.objects.filter(active=True)
if not active_brands.exists():
return []
# Collect search terms
text_prompts = []
brand_mapping = {}
for brand in active_brands:
for term in brand.search_terms:
prompt = f"a photo containing {term}"
text_prompts.append(prompt)
brand_mapping[len(text_prompts)-1] = brand.name
text_prompts.append("a photo with no brands or logos")
# CLIP inference
inputs = self.processor(text=text_prompts, images=image, return_tensors="pt", padding=True)
import torch
with torch.no_grad():
outputs = self.model(**inputs)
probs = outputs.logits_per_image.softmax(dim=1)
results = []
for i, prob in enumerate(probs[0][:-1]):
confidence = float(prob)
if confidence > confidence_threshold and i in brand_mapping:
results.append({
'label': brand_mapping[i],
'confidence': confidence,
'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1} # Full frame for CLIP
})
return sorted(results, key=lambda x: x['confidence'], reverse=True)[:5]
except Exception as e:
logger.error(f"CLIP logo detection error: {e}")
return []
class LogoDetectionAdapterFactory(AdapterFactory):
"""Factory for logo detection adapters"""
@staticmethod
def create(provider_config):
provider_type = provider_config.get('provider_type')
if provider_type == 'gcp_vision':
return GCPLogoDetectionAdapter()
elif provider_type == 'local_clip':
model_id = provider_config.get('model_identifier', 'openai/clip-vit-base-patch32')
return CLIPLogoDetectionAdapter(model_id)
else:
raise ValueError(f"Unknown logo detection provider: {provider_type}")

View File

@@ -0,0 +1,131 @@
import logging
from .base import VideoAnalysisAdapter, AdapterFactory
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class OpenCVMotionAnalysisAdapter(VideoAnalysisAdapter):
"""Local OpenCV-based motion analysis"""
def analyze(self, video_path, **kwargs):
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {}
# Initialize background subtractor
backSub = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
frame_count = 0
motion_scores = []
while True:
ret, frame = cap.read()
if not ret:
break
# Apply background subtraction
fgMask = backSub.apply(frame)
# Calculate motion score (percentage of changed pixels)
motion_pixels = cv2.countNonZero(fgMask)
total_pixels = fgMask.shape[0] * fgMask.shape[1]
motion_score = motion_pixels / total_pixels
motion_scores.append(motion_score)
frame_count += 1
cap.release()
if motion_scores:
return {
'average_motion': float(np.mean(motion_scores)),
'max_motion': float(np.max(motion_scores)),
'activity_score': float(np.mean(motion_scores) * 10), # Scale to 0-10
'frame_count': frame_count
}
else:
return {}
except Exception as e:
logger.error(f"Motion analysis error: {e}")
return {}
class GCPVideoIntelligenceAdapter(VideoAnalysisAdapter):
"""Google Cloud Video Intelligence API adapter"""
def __init__(self):
from google.cloud import videointelligence
self.client = videointelligence.VideoIntelligenceServiceClient()
def analyze(self, video_path, **kwargs):
try:
# Read video file
with open(video_path, 'rb') as video_file:
input_content = video_file.read()
# Configure analysis features
from google.cloud import videointelligence
features = [
videointelligence.Feature.SHOT_CHANGE_DETECTION,
videointelligence.Feature.OBJECT_TRACKING
]
# Start analysis
operation = self.client.annotate_video(
request={
"features": features,
"input_content": input_content,
}
)
# Wait for completion (this might take a while)
result = operation.result(timeout=300)
# Process results
analysis_results = {}
# Shot changes (scene transitions)
if result.annotation_results[0].shot_annotations:
shots = []
for shot in result.annotation_results[0].shot_annotations:
shots.append({
'start_time': shot.start_time_offset.total_seconds(),
'end_time': shot.end_time_offset.total_seconds()
})
analysis_results['shots'] = shots
# Object tracking
if result.annotation_results[0].object_annotations:
objects = []
for obj in result.annotation_results[0].object_annotations:
objects.append({
'entity': obj.entity.description,
'confidence': obj.confidence,
'frames': len(obj.frames)
})
analysis_results['tracked_objects'] = objects
return analysis_results
except Exception as e:
logger.error(f"GCP video intelligence error: {e}")
return {}
class MotionAnalysisAdapterFactory(AdapterFactory):
"""Factory for motion analysis adapters"""
@staticmethod
def create(provider_config):
provider_type = provider_config.get('provider_type')
if provider_type == 'local_opencv':
return OpenCVMotionAnalysisAdapter()
elif provider_type == 'gcp_video_intelligence':
return GCPVideoIntelligenceAdapter()
else:
raise ValueError(f"Unknown motion analysis provider: {provider_type}")

View File

@@ -0,0 +1,123 @@
import logging
from .base import DetectionAdapter, AdapterFactory
import io
logger = logging.getLogger(__name__)
class GCPObjectDetectionAdapter(DetectionAdapter):
"""Google Cloud Vision object detection"""
def __init__(self):
from google.cloud import vision
self.client = vision.ImageAnnotatorClient()
def detect(self, image, confidence_threshold=0.5):
try:
# Convert image to bytes
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='JPEG')
image_bytes = img_byte_arr.getvalue()
# GCP Vision API call
from google.cloud import vision
vision_image = vision.Image(content=image_bytes)
response = self.client.object_localization(image=vision_image)
results = []
for obj in response.localized_object_annotations:
if obj.score >= confidence_threshold:
# Convert normalized vertices to bbox
vertices = obj.bounding_poly.normalized_vertices
x_coords = [v.x for v in vertices]
y_coords = [v.y for v in vertices]
bbox = {
'x': min(x_coords),
'y': min(y_coords),
'width': max(x_coords) - min(x_coords),
'height': max(y_coords) - min(y_coords)
}
results.append({
'label': obj.name,
'confidence': obj.score,
'bbox': bbox
})
return results
except Exception as e:
logger.error(f"GCP object detection error: {e}")
return []
class YOLOObjectDetectionAdapter(DetectionAdapter):
"""Local YOLO object detection"""
def __init__(self, model_path="yolov8n.pt"):
self.model_path = model_path
self.model = None
def _load_model(self):
if not self.model:
from ultralytics import YOLO
self.model = YOLO(self.model_path)
def detect(self, image, confidence_threshold=0.5):
try:
self._load_model()
# Convert PIL to OpenCV format
import cv2
import numpy as np
img_array = np.array(image)
# YOLO inference
results = self.model(img_array, conf=confidence_threshold)
detections = []
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
# Get normalized coordinates
x1, y1, x2, y2 = box.xyxyn[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = self.model.names[class_id]
bbox = {
'x': x1,
'y': y1,
'width': x2 - x1,
'height': y2 - y1
}
detections.append({
'label': class_name,
'confidence': confidence,
'bbox': bbox
})
return detections
except Exception as e:
logger.error(f"YOLO object detection error: {e}")
return []
class ObjectDetectionAdapterFactory(AdapterFactory):
"""Factory for object detection adapters"""
@staticmethod
def create(provider_config):
provider_type = provider_config.get('provider_type')
if provider_type == 'gcp_vision':
return GCPObjectDetectionAdapter()
elif provider_type == 'local_yolo':
model_path = provider_config.get('model_identifier', 'yolov8n.pt')
return YOLOObjectDetectionAdapter(model_path)
else:
raise ValueError(f"Unknown object detection provider: {provider_type}")

View File

@@ -0,0 +1,125 @@
import logging
from .base import DetectionAdapter, AdapterFactory
import io
logger = logging.getLogger(__name__)
class GCPTextDetectionAdapter(DetectionAdapter):
"""Google Cloud Vision text detection (OCR)"""
def __init__(self):
from google.cloud import vision
self.client = vision.ImageAnnotatorClient()
def detect(self, image, confidence_threshold=0.5):
try:
# Convert image to bytes
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='JPEG')
image_bytes = img_byte_arr.getvalue()
# GCP Vision API call
from google.cloud import vision
vision_image = vision.Image(content=image_bytes)
response = self.client.text_detection(image=vision_image)
results = []
# Skip first annotation (full text), process individual words/lines
for text in response.text_annotations[1:]:
vertices = text.bounding_poly.vertices
x_coords = [v.x for v in vertices]
y_coords = [v.y for v in vertices]
# Normalize using image dimensions
width, height = image.size
bbox = {
'x': min(x_coords) / width,
'y': min(y_coords) / height,
'width': (max(x_coords) - min(x_coords)) / width,
'height': (max(y_coords) - min(y_coords)) / height
}
results.append({
'label': text.description,
'confidence': 1.0, # GCP doesn't provide text confidence
'bbox': bbox
})
return results
except Exception as e:
logger.error(f"GCP text detection error: {e}")
return []
class TesseractTextDetectionAdapter(DetectionAdapter):
"""Local Tesseract OCR adapter"""
def __init__(self):
try:
import pytesseract
self.tesseract = pytesseract
except ImportError:
logger.error("pytesseract not installed")
self.tesseract = None
def detect(self, image, confidence_threshold=0.5):
if not self.tesseract:
return []
try:
import cv2
import numpy as np
# Convert to OpenCV format
img_array = np.array(image)
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# Get bounding box data
data = self.tesseract.image_to_data(gray, output_type=self.tesseract.Output.DICT)
results = []
height, width = gray.shape
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if text and conf > (confidence_threshold * 100):
# Normalize coordinates
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
bbox = {
'x': x / width,
'y': y / height,
'width': w / width,
'height': h / height
}
results.append({
'label': text,
'confidence': conf / 100.0,
'bbox': bbox
})
return results
except Exception as e:
logger.error(f"Tesseract text detection error: {e}")
return []
class TextDetectionAdapterFactory(AdapterFactory):
"""Factory for text detection adapters"""
@staticmethod
def create(provider_config):
provider_type = provider_config.get('provider_type')
if provider_type == 'gcp_vision':
return GCPTextDetectionAdapter()
elif provider_type == 'local_tesseract':
return TesseractTextDetectionAdapter()
else:
raise ValueError(f"Unknown text detection provider: {provider_type}")

View File

@@ -0,0 +1,128 @@
import cv2
import numpy as np
from PIL import Image
import logging
from .adapters.object_detection import ObjectDetectionAdapterFactory
from .adapters.logo_detection import LogoDetectionAdapterFactory
from .adapters.text_detection import TextDetectionAdapterFactory
from .adapters.motion_analysis import MotionAnalysisAdapterFactory
logger = logging.getLogger(__name__)
class AnalysisEngine:
"""Main analysis engine that orchestrates capability-specific adapters"""
def __init__(self):
self.object_detector = None
self.logo_detector = None
self.text_detector = None
self.motion_analyzer = None
def configure_providers(self, provider_config):
"""Configure adapters based on provider settings"""
if 'object_detection' in provider_config:
self.object_detector = ObjectDetectionAdapterFactory.create(
provider_config['object_detection']
)
if 'logo_detection' in provider_config:
self.logo_detector = LogoDetectionAdapterFactory.create(
provider_config['logo_detection']
)
if 'text_detection' in provider_config:
self.text_detector = TextDetectionAdapterFactory.create(
provider_config['text_detection']
)
if 'motion_analysis' in provider_config:
self.motion_analyzer = MotionAnalysisAdapterFactory.create(
provider_config['motion_analysis']
)
def extract_frame_from_segment(self, segment_path, timestamp=None):
"""Extract frame from video segment"""
try:
cap = cv2.VideoCapture(segment_path)
if not cap.isOpened():
return None
# For TS segments, seeking is problematic, just read first frame
# This is suitable for real-time analysis where any frame is representative
ret, frame = cap.read()
cap.release()
if ret:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(frame_rgb)
return None
except Exception as e:
logger.error(f"Error extracting frame: {e}")
return None
def analyze_frame(self, image, requested_analysis, confidence_threshold=0.5):
"""Analyze a single frame using configured adapters"""
results = {}
# Object detection
if 'object_detection' in requested_analysis and self.object_detector:
results['objects'] = self.object_detector.detect(image, confidence_threshold)
# Logo detection
if 'logo_detection' in requested_analysis and self.logo_detector:
results['logos'] = self.logo_detector.detect(image, confidence_threshold)
# Text detection
if 'text_detection' in requested_analysis and self.text_detector:
results['text'] = self.text_detector.detect(image, confidence_threshold)
# Visual properties (always computed locally)
if 'visual_analysis' in requested_analysis:
results['visual'] = self._analyze_visual_properties(image)
return results
def analyze_video_segment(self, segment_path, requested_analysis):
"""Analyze video segment for temporal features"""
results = {}
# Motion analysis
if 'motion_analysis' in requested_analysis and self.motion_analyzer:
results['motion'] = self.motion_analyzer.analyze(segment_path)
return results
def _analyze_visual_properties(self, image):
"""Local visual property analysis"""
img_array = np.array(image)
# Dominant colors
dominant_colors = self._get_dominant_colors(img_array)
# Visual metrics
brightness = float(np.mean(img_array)) / 255.0
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
contrast = float(np.std(gray)) / 255.0
hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV)
saturation = float(np.mean(hsv[:,:,1])) / 255.0
return {
'dominant_colors': dominant_colors,
'brightness_level': brightness,
'contrast_level': contrast,
'saturation_level': saturation
}
def _get_dominant_colors(self, image_array, k=3):
try:
data = image_array.reshape((-1, 3))
data = np.float32(data)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
_, labels, centers = cv2.kmeans(data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
return centers.astype(int).tolist()
except:
return [[128, 128, 128]]

View File

@@ -0,0 +1,70 @@
[
{
"model": "ai_processing.analysisprovider",
"pk": "550e8400-e29b-41d4-a716-446655440001",
"fields": {
"name": "Google Cloud Vision",
"provider_type": "gcp_vision",
"model_identifier": "cloud-vision-api",
"capabilities": ["object_detection", "logo_detection", "text_detection"],
"active": true,
"api_config": {}
}
},
{
"model": "ai_processing.analysisprovider",
"pk": "550e8400-e29b-41d4-a716-446655440002",
"fields": {
"name": "Local CLIP",
"provider_type": "local_clip",
"model_identifier": "openai/clip-vit-base-patch32",
"capabilities": ["logo_detection", "scene_analysis"],
"active": false,
"api_config": {}
}
},
{
"model": "ai_processing.brand",
"pk": "550e8400-e29b-41d4-a716-446655440010",
"fields": {
"name": "Apple",
"search_terms": ["Apple logo", "iPhone", "MacBook", "iPad"],
"active": true,
"category": "tech",
"created_at": "2025-08-18T00:00:00Z"
}
},
{
"model": "ai_processing.brand",
"pk": "550e8400-e29b-41d4-a716-446655440011",
"fields": {
"name": "Google",
"search_terms": ["Google logo", "Android", "Pixel phone"],
"active": true,
"category": "tech",
"created_at": "2025-08-18T00:00:00Z"
}
},
{
"model": "ai_processing.brand",
"pk": "550e8400-e29b-41d4-a716-446655440012",
"fields": {
"name": "Nike",
"search_terms": ["Nike logo", "Nike swoosh"],
"active": true,
"category": "fashion",
"created_at": "2025-08-18T00:00:00Z"
}
},
{
"model": "ai_processing.brand",
"pk": "550e8400-e29b-41d4-a716-446655440013",
"fields": {
"name": "Coca Cola",
"search_terms": ["Coca Cola logo", "Coke bottle"],
"active": true,
"category": "beverage",
"created_at": "2025-08-18T00:00:00Z"
}
}
]

View File

@@ -0,0 +1,120 @@
# Generated by Django 5.0.6 on 2025-08-18 04:34
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='AnalysisProvider',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=100, unique=True)),
('provider_type', models.CharField(max_length=50)),
('model_identifier', models.CharField(max_length=200)),
('capabilities', models.JSONField(default=list)),
('active', models.BooleanField(default=True)),
('api_config', models.JSONField(default=dict)),
],
),
migrations.CreateModel(
name='Brand',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=100, unique=True)),
('search_terms', models.JSONField(default=list)),
('active', models.BooleanField(default=True)),
('category', models.CharField(max_length=50, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
],
),
migrations.CreateModel(
name='ProcessingQueue',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('stream_id', models.CharField(max_length=100)),
('segment_path', models.CharField(max_length=500)),
('priority', models.IntegerField(default=0)),
('status', models.CharField(choices=[('pending', 'Pending'), ('processing', 'Processing'), ('completed', 'Completed'), ('failed', 'Failed')], default='pending', max_length=20)),
('analysis_types', models.JSONField(default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('started_at', models.DateTimeField(null=True)),
('completed_at', models.DateTimeField(null=True)),
('error_message', models.TextField(null=True)),
],
options={
'indexes': [models.Index(fields=['status', 'priority'], name='ai_processi_status_958bda_idx'), models.Index(fields=['stream_id'], name='ai_processi_stream__60b035_idx')],
},
),
migrations.CreateModel(
name='VideoAnalysis',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('stream_id', models.CharField(max_length=100)),
('segment_path', models.CharField(max_length=500)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('processing_time', models.FloatField(null=True)),
('analysis_type', models.CharField(max_length=50)),
('confidence_threshold', models.FloatField(default=0.5)),
('frame_timestamp', models.FloatField()),
('external_request_id', models.CharField(max_length=200, null=True)),
('provider', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='ai_processing.analysisprovider')),
],
),
migrations.CreateModel(
name='DetectionResult',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('label', models.CharField(max_length=200)),
('confidence', models.FloatField()),
('bbox_x', models.FloatField()),
('bbox_y', models.FloatField()),
('bbox_width', models.FloatField()),
('bbox_height', models.FloatField()),
('detection_type', models.CharField(max_length=50)),
('metadata', models.JSONField(default=dict)),
('analysis', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='detections', to='ai_processing.videoanalysis')),
],
),
migrations.CreateModel(
name='VisualAnalysis',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('dominant_colors', models.JSONField(default=list)),
('brightness_level', models.FloatField()),
('contrast_level', models.FloatField(null=True)),
('saturation_level', models.FloatField(null=True)),
('activity_score', models.FloatField(null=True)),
('scene_description', models.TextField(null=True)),
('analysis', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='visual', to='ai_processing.videoanalysis')),
],
),
migrations.AddIndex(
model_name='videoanalysis',
index=models.Index(fields=['stream_id', 'timestamp'], name='ai_processi_stream__b961da_idx'),
),
migrations.AddIndex(
model_name='videoanalysis',
index=models.Index(fields=['analysis_type'], name='ai_processi_analysi_774d33_idx'),
),
migrations.AddIndex(
model_name='detectionresult',
index=models.Index(fields=['label'], name='ai_processi_label_20bc74_idx'),
),
migrations.AddIndex(
model_name='detectionresult',
index=models.Index(fields=['confidence'], name='ai_processi_confide_123ee5_idx'),
),
migrations.AddIndex(
model_name='detectionresult',
index=models.Index(fields=['detection_type'], name='ai_processi_detecti_c390f2_idx'),
),
]

View File

@@ -1,3 +1,150 @@
from django.db import models from django.db import models
import uuid
# Create your models here.
class AnalysisProvider(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
provider_type = models.CharField(max_length=50)
model_identifier = models.CharField(max_length=200)
capabilities = models.JSONField(default=list)
active = models.BooleanField(default=True)
api_config = models.JSONField(default=dict)
def __str__(self):
return self.name
class Brand(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
search_terms = models.JSONField(default=list)
active = models.BooleanField(default=True)
category = models.CharField(max_length=50, null=True)
def __str__(self):
return self.name
class VideoAnalysis(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
stream_id = models.CharField(max_length=100)
segment_path = models.CharField(max_length=500)
timestamp = models.DateTimeField(auto_now_add=True)
processing_time = models.FloatField(null=True)
provider = models.ForeignKey(AnalysisProvider, on_delete=models.CASCADE)
analysis_type = models.CharField(max_length=50)
confidence_threshold = models.FloatField(default=0.5)
frame_timestamp = models.FloatField()
external_request_id = models.CharField(max_length=200, null=True)
def to_dict(self):
return {
'id': str(self.id),
'stream_id': self.stream_id,
'timestamp': self.timestamp.isoformat(),
'processing_time': self.processing_time,
'analysis_type': self.analysis_type,
'frame_timestamp': self.frame_timestamp,
'provider': self.provider.name,
'detections': [d.to_dict() for d in self.detections.all()],
'visual': self.visual.to_dict() if hasattr(self, 'visual') else None
}
class Meta:
indexes = [
models.Index(fields=['stream_id', 'timestamp']),
models.Index(fields=['analysis_type']),
]
class DetectionResult(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
analysis = models.ForeignKey(VideoAnalysis, on_delete=models.CASCADE, related_name='detections')
label = models.CharField(max_length=200)
confidence = models.FloatField()
bbox_x = models.FloatField()
bbox_y = models.FloatField()
bbox_width = models.FloatField()
bbox_height = models.FloatField()
detection_type = models.CharField(max_length=50) # 'object', 'logo', 'text'
metadata = models.JSONField(default=dict) # Provider-specific data
def to_dict(self):
return {
'id': str(self.id),
'label': self.label,
'confidence': self.confidence,
'bbox': {
'x': self.bbox_x,
'y': self.bbox_y,
'width': self.bbox_width,
'height': self.bbox_height
},
'detection_type': self.detection_type,
'metadata': self.metadata
}
class Meta:
indexes = [
models.Index(fields=['label']),
models.Index(fields=['confidence']),
models.Index(fields=['detection_type']),
]
class VisualAnalysis(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
analysis = models.OneToOneField(VideoAnalysis, on_delete=models.CASCADE, related_name='visual')
dominant_colors = models.JSONField(default=list)
brightness_level = models.FloatField()
contrast_level = models.FloatField(null=True)
saturation_level = models.FloatField(null=True)
activity_score = models.FloatField(null=True)
scene_description = models.TextField(null=True)
def to_dict(self):
return {
'dominant_colors': self.dominant_colors,
'brightness_level': self.brightness_level,
'contrast_level': self.contrast_level,
'saturation_level': self.saturation_level,
'activity_score': self.activity_score,
'scene_description': self.scene_description
}
class Brand(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
search_terms = models.JSONField(default=list) # ["Apple logo", "iPhone", "MacBook"]
active = models.BooleanField(default=True)
category = models.CharField(max_length=50, null=True)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
class ProcessingQueue(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
stream_id = models.CharField(max_length=100)
segment_path = models.CharField(max_length=500)
priority = models.IntegerField(default=0)
status = models.CharField(max_length=20, choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed')
], default='pending')
analysis_types = models.JSONField(default=list) # ['yolo', 'clip_brands', 'clip_scene']
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True)
completed_at = models.DateTimeField(null=True)
error_message = models.TextField(null=True)
class Meta:
indexes = [
models.Index(fields=['status', 'priority']),
models.Index(fields=['stream_id']),
]

View File

@@ -0,0 +1,98 @@
import logging
from ..analysis_engine import AnalysisEngine
from ..models import AnalysisProvider, ProcessingQueue
from ..tasks import process_video_segment
logger = logging.getLogger(__name__)
class VideoAnalyzer:
"""Main video analysis coordinator"""
def __init__(self):
self.engine = AnalysisEngine()
self.configured = False
def setup_providers(self):
"""Configure analysis engine with active providers"""
try:
providers = AnalysisProvider.objects.filter(active=True)
if not providers.exists():
# Activate local CLIP as fallback
clip_provider = AnalysisProvider.objects.filter(
provider_type='local_clip'
).first()
if clip_provider:
clip_provider.active = True
clip_provider.save()
providers = [clip_provider]
config = {}
for provider in providers:
if 'logo_detection' in provider.capabilities:
config['logo_detection'] = {
'provider_type': provider.provider_type,
'model_identifier': provider.model_identifier
}
if 'object_detection' in provider.capabilities:
config['object_detection'] = {
'provider_type': provider.provider_type,
'model_identifier': provider.model_identifier
}
if config:
self.engine.configure_providers(config)
self.configured = True
logger.info(f"Configured providers: {list(config.keys())}")
else:
logger.warning("No providers with supported capabilities found")
except Exception as e:
logger.error(f"Error setting up providers: {e}")
def queue_segment_analysis(self, stream_id, segment_path):
"""Queue video segment for analysis"""
try:
# Check if already queued
existing = ProcessingQueue.objects.filter(
stream_id=stream_id,
segment_path=segment_path,
status__in=['pending', 'processing']
).exists()
if existing:
logger.debug(f"Segment already queued: {segment_path}")
return False
# Create queue item
queue_item = ProcessingQueue.objects.create(
stream_id=stream_id,
segment_path=segment_path,
analysis_types=['logo_detection'],
priority=1
)
# Trigger async processing
process_video_segment.delay(stream_id, segment_path)
logger.info(f"Queued segment for analysis: {segment_path}")
return True
except Exception as e:
logger.error(f"Error queuing segment: {e}")
return False
def analyze_frame_sync(self, image, analysis_types=['logo_detection'], confidence_threshold=0.3):
"""Synchronous frame analysis for testing"""
if not self.configured:
self.setup_providers()
if not self.configured:
return {"error": "No providers configured"}
try:
results = self.engine.analyze_frame(image, analysis_types, confidence_threshold)
return results
except Exception as e:
logger.error(f"Frame analysis error: {e}")
return {"error": str(e)}

View File

@@ -0,0 +1,199 @@
from celery import shared_task
import logging
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .analysis_engine import AnalysisEngine
from .models import VideoAnalysis, DetectionResult, VisualAnalysis, ProcessingQueue, AnalysisProvider
logger = logging.getLogger(__name__)
channel_layer = get_channel_layer()
@shared_task(bind=True)
def process_video_segment(self, stream_id, segment_path):
"""Process a video segment with AI analysis"""
try:
# Update queue status
queue_item = ProcessingQueue.objects.filter(
stream_id=stream_id,
segment_path=segment_path,
status='pending'
).first()
if queue_item:
queue_item.status = 'processing'
queue_item.save()
# Initialize analysis engine
engine = AnalysisEngine()
# Debug: Check all providers
all_providers = AnalysisProvider.objects.all()
logger.info(f"Found {all_providers.count()} total providers:")
for p in all_providers:
logger.info(f" - {p.name}: {p.provider_type} (active: {p.active})")
# Get logo detection provider
logo_provider = AnalysisProvider.objects.filter(
provider_type='local_clip'
).first()
if not logo_provider:
logger.error("No CLIP provider found in database at all!")
if queue_item:
queue_item.status = 'failed'
queue_item.error_message = 'No CLIP provider in database'
queue_item.save()
return {"error": "No CLIP provider in database"}
logger.info(f"Found CLIP provider: {logo_provider.name} (active: {logo_provider.active})")
if not logo_provider.active:
logo_provider.active = True
logo_provider.save()
logger.info(f"Auto-activated CLIP provider: {logo_provider.name}")
if logo_provider:
# Configure engine with logo detection
config = {
'logo_detection': {
'provider_type': 'local_clip',
'model_identifier': logo_provider.model_identifier
}
}
logger.info(f"Configuring engine with config: {config}")
engine.configure_providers(config)
logger.info("Engine configuration completed")
# Extract frame from segment
logger.info(f"Extracting frame from: {segment_path}")
frame = engine.extract_frame_from_segment(segment_path)
if frame:
logger.info(f"Frame extracted successfully, size: {frame.size}")
# Analyze frame for logos
logger.info("Starting frame analysis...")
analysis_results = engine.analyze_frame(
frame,
['logo_detection', 'visual_analysis'],
confidence_threshold=0.3
)
logger.info(f"Analysis results: {analysis_results}")
# Store analysis results
analysis = VideoAnalysis.objects.create(
stream_id=stream_id,
segment_path=segment_path,
provider=logo_provider,
analysis_type='logo_detection',
frame_timestamp=0.0,
confidence_threshold=0.3
)
# Store detections
detections = []
if 'logos' in analysis_results:
for logo in analysis_results['logos']:
detection = DetectionResult.objects.create(
analysis=analysis,
label=logo['label'],
confidence=logo['confidence'],
bbox_x=logo['bbox']['x'],
bbox_y=logo['bbox']['y'],
bbox_width=logo['bbox']['width'],
bbox_height=logo['bbox']['height'],
detection_type='logo'
)
detections.append(detection.to_dict())
# Store visual analysis
if 'visual' in analysis_results:
VisualAnalysis.objects.create(
analysis=analysis,
dominant_colors=analysis_results['visual']['dominant_colors'],
brightness_level=analysis_results['visual']['brightness_level'],
contrast_level=analysis_results['visual']['contrast_level'],
saturation_level=analysis_results['visual']['saturation_level']
)
# Send results via WebSocket
if detections:
async_to_sync(channel_layer.group_send)(
f"stream_{stream_id}",
{
"type": "analysis_update",
"analysis": analysis.to_dict()
}
)
# Update queue status
if queue_item:
queue_item.status = 'completed'
queue_item.save()
logger.info(f"Processed segment {segment_path}: {len(detections)} detections")
return {"detections": len(detections), "analysis_id": str(analysis.id)}
else:
logger.error("Failed to extract frame from segment")
if queue_item:
queue_item.status = 'failed'
queue_item.error_message = 'Failed to extract frame from video segment'
queue_item.save()
return {"error": "Failed to extract frame from segment"}
# No provider configured
if queue_item:
queue_item.status = 'failed'
queue_item.error_message = 'No active AI provider configured'
queue_item.save()
return {"error": "No AI provider configured"}
except Exception as e:
logger.error(f"Error processing segment {segment_path}: {e}")
if queue_item:
queue_item.status = 'failed'
queue_item.error_message = str(e)
queue_item.save()
raise self.retry(countdown=60, max_retries=3)
@shared_task
def analyze_frame_task(stream_id, segment_path, frame_timestamp=0.0):
"""Analyze a single frame from video segment"""
try:
engine = AnalysisEngine()
# Get active providers
providers = AnalysisProvider.objects.filter(active=True)
if not providers.exists():
return {"error": "No active providers"}
# Configure engine
config = {}
for provider in providers:
if 'logo_detection' in provider.capabilities:
config['logo_detection'] = {
'provider_type': provider.provider_type,
'model_identifier': provider.model_identifier
}
engine.configure_providers(config)
# Extract and analyze frame
frame = engine.extract_frame_from_segment(segment_path, frame_timestamp)
if not frame:
return {"error": "Could not extract frame"}
results = engine.analyze_frame(frame, ['logo_detection', 'visual_analysis'])
return {
"stream_id": stream_id,
"results": results,
"frame_timestamp": frame_timestamp
}
except Exception as e:
logger.error(f"Frame analysis error: {e}")
return {"error": str(e)}

View File

@@ -0,0 +1,8 @@
from django.urls import path
from . import views
urlpatterns = [
path('streams/<str:stream_id>/analysis/', views.stream_analysis, name='stream_analysis'),
path('providers/', views.providers, name='providers'),
path('brands/', views.brands, name='brands'),
]

View File

@@ -1,3 +1,31 @@
from django.shortcuts import render from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from ai_processing.models import VideoAnalysis, AnalysisProvider, Brand
# Create your views here.
@require_http_methods(["GET"])
def stream_analysis(request, stream_id):
analyses = VideoAnalysis.objects.filter(stream_id=stream_id).order_by('-timestamp')
return JsonResponse({'results': [a.to_dict() for a in analyses]})
@require_http_methods(["GET"])
def providers(request):
providers = AnalysisProvider.objects.filter(active=True)
return JsonResponse({
'providers': [
{'id': str(p.id), 'name': p.name, 'capabilities': p.capabilities}
for p in providers
]
})
@require_http_methods(["GET"])
def brands(request):
brands = Brand.objects.filter(active=True)
return JsonResponse({
'brands': [
{'id': str(b.id), 'name': b.name, 'category': b.category}
for b in brands
]
})

View File

@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)

View File

@@ -8,9 +8,25 @@ https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/
""" """
import os import os
from django.core.asgi import get_asgi_application from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings') # Set Django settings first
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
application = get_asgi_application() # Initialize Django ASGI application early to ensure the AppRegistry is populated
django_asgi_app = get_asgi_application()
# Now import channels and consumers after Django is initialized
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.urls import path
from streaming.consumers import StreamAnalysisConsumer
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": AuthMiddlewareStack(
URLRouter([
path("ws/stream/<str:stream_id>/", StreamAnalysisConsumer.as_asgi()),
])
),
})

View File

@@ -0,0 +1,18 @@
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
app = Celery('media_analyzer')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')

View File

@@ -75,6 +75,25 @@ TEMPLATES = [
] ]
WSGI_APPLICATION = "media_analyzer.wsgi.application" WSGI_APPLICATION = "media_analyzer.wsgi.application"
ASGI_APPLICATION = "media_analyzer.asgi.application"
# Channels configuration
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
},
},
}
# Celery configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
# Database # Database

View File

@@ -7,3 +7,8 @@ redis==5.0.1
ffmpeg-python==0.2.0 ffmpeg-python==0.2.0
Pillow==10.0.1 Pillow==10.0.1
django-cors-headers==4.3.1 django-cors-headers==4.3.1
torch==2.1.0
torchvision==0.16.0
transformers==4.36.0
opencv-python==4.8.1.78
numpy==1.24.3

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env python
"""Setup AI providers for development"""
import os
import django
import sys
from pathlib import Path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.models import AnalysisProvider
# Ensure CLIP is active
clip = AnalysisProvider.objects.get(provider_type='local_clip')
clip.active = True
clip.save()
# Ensure GCP is inactive
gcp = AnalysisProvider.objects.get(provider_type='gcp_vision')
gcp.active = False
gcp.save()
print(f"✅ CLIP active: {clip.active}")
print(f"❌ GCP active: {gcp.active}")
print("🚀 Ready for logo detection!")

View File

@@ -0,0 +1,82 @@
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from ai_processing.models import VideoAnalysis
logger = logging.getLogger(__name__)
class StreamAnalysisConsumer(AsyncWebsocketConsumer):
"""WebSocket consumer for real-time analysis updates"""
async def connect(self):
self.stream_id = self.scope['url_route']['kwargs']['stream_id']
self.room_group_name = f'stream_{self.stream_id}'
# Join stream group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
logger.info(f"WebSocket connected for stream {self.stream_id}")
# Send recent analysis results
await self.send_recent_analysis()
async def disconnect(self, close_code):
# Leave stream group
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
logger.info(f"WebSocket disconnected for stream {self.stream_id}")
async def receive(self, text_data):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(text_data)
message_type = data.get('type')
if message_type == 'ping':
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': data.get('timestamp')
}))
elif message_type == 'request_analysis':
# Trigger analysis if needed
pass
except json.JSONDecodeError:
logger.error("Invalid JSON received")
async def analysis_update(self, event):
"""Handle analysis update from task"""
await self.send(text_data=json.dumps({
'type': 'analysis_update',
'analysis': event['analysis']
}))
@database_sync_to_async
def get_recent_analysis(self):
"""Get recent analysis results for stream"""
try:
analyses = VideoAnalysis.objects.filter(
stream_id=self.stream_id
).order_by('-timestamp')[:5]
return [analysis.to_dict() for analysis in analyses]
except Exception as e:
logger.error(f"Error getting recent analysis: {e}")
return []
async def send_recent_analysis(self):
"""Send recent analysis results to client"""
recent_analyses = await self.get_recent_analysis()
if recent_analyses:
await self.send(text_data=json.dumps({
'type': 'recent_analysis',
'analyses': recent_analyses
}))

View File

@@ -0,0 +1,84 @@
import os
import time
import logging
from pathlib import Path
from django.conf import settings
from ai_processing.processors.video_analyzer import VideoAnalyzer
logger = logging.getLogger(__name__)
class HLSSegmentMonitor:
"""Monitor HLS segments and trigger AI analysis"""
def __init__(self):
self.video_analyzer = VideoAnalyzer()
self.processed_segments = set()
self.media_dir = getattr(settings, 'MEDIA_ROOT', '/tmp/media')
def start_monitoring(self, stream_id, check_interval=2):
"""Start monitoring for new segments"""
logger.info(f"Starting HLS monitoring for stream {stream_id}")
while True:
try:
self.check_for_new_segments(stream_id)
time.sleep(check_interval)
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
break
except Exception as e:
logger.error(f"Monitor error: {e}")
time.sleep(check_interval)
def check_for_new_segments(self, stream_id):
"""Check for new .ts segments and queue them for analysis"""
try:
# Look for .ts files in media directory
media_path = Path(self.media_dir)
# Find .ts files that match the stream pattern
pattern = f"*{stream_id}*.ts"
ts_files = list(media_path.glob(pattern))
if not ts_files:
# Also check for generic .ts files
ts_files = list(media_path.glob("*.ts"))
for ts_file in ts_files:
file_path = str(ts_file)
# Skip if already processed
if file_path in self.processed_segments:
continue
# Check if file is complete (not being written)
if self.is_file_complete(file_path):
logger.info(f"Found new segment: {file_path}")
# Queue for analysis
success = self.video_analyzer.queue_segment_analysis(stream_id, file_path)
if success:
self.processed_segments.add(file_path)
logger.info(f"Queued segment for analysis: {file_path}")
except Exception as e:
logger.error(f"Error checking segments: {e}")
def is_file_complete(self, file_path, stable_time=1):
"""Check if file is stable (not being written)"""
try:
stat1 = os.stat(file_path)
time.sleep(stable_time)
stat2 = os.stat(file_path)
# File is stable if size and modification time haven't changed
return stat1.st_size == stat2.st_size and stat1.st_mtime == stat2.st_mtime
except (OSError, FileNotFoundError):
return False
def trigger_manual_analysis(self, stream_id, segment_path):
"""Manually trigger analysis for a specific segment"""
return self.video_analyzer.queue_segment_analysis(stream_id, segment_path)

View File

@@ -6,5 +6,6 @@ urlpatterns = [
path('streams/create/', views.create_stream, name='create_stream'), path('streams/create/', views.create_stream, name='create_stream'),
path('streams/<int:stream_id>/start/', views.start_stream, name='start_stream'), path('streams/<int:stream_id>/start/', views.start_stream, name='start_stream'),
path('streams/<int:stream_id>/stop/', views.stop_stream, name='stop_stream'), path('streams/<int:stream_id>/stop/', views.stop_stream, name='stop_stream'),
path('streams/<str:stream_id>/analyze/', views.trigger_analysis, name='trigger_analysis'),
path('hls/<str:filename>', views.serve_hls_file, name='serve_hls_file'), path('hls/<str:filename>', views.serve_hls_file, name='serve_hls_file'),
] ]

View File

@@ -5,6 +5,7 @@ from django.shortcuts import get_object_or_404
from django.conf import settings from django.conf import settings
from .models import VideoStream, StreamStatus from .models import VideoStream, StreamStatus
from .source_adapters import SourceAdapterFactory from .source_adapters import SourceAdapterFactory
from ai_processing.processors.video_analyzer import VideoAnalyzer
import json import json
import uuid import uuid
import logging import logging
@@ -112,6 +113,24 @@ def serve_hls_file(request, filename):
if not os.path.exists(file_path): if not os.path.exists(file_path):
raise Http404("HLS file not found") raise Http404("HLS file not found")
# Trigger analysis for new .ts segments
if filename.endswith('.ts'):
try:
# Extract stream ID from UUID-based filename: 43606ec7-786c-4f7d-acf3-95981f9e5ebe-415.ts
if '-' in filename:
# Split by dash and take first 5 parts (UUID format)
parts = filename.split('-')
if len(parts) >= 5:
stream_id = '-'.join(parts[:5]) # Reconstruct UUID
# Queue for analysis
analyzer = VideoAnalyzer()
analyzer.queue_segment_analysis(stream_id, file_path)
logger.info(f"Queued segment for analysis: {filename} (stream: {stream_id})")
except Exception as e:
logger.error(f"Error queuing analysis for {filename}: {e}")
# Determine content type # Determine content type
if filename.endswith('.m3u8'): if filename.endswith('.m3u8'):
content_type = 'application/vnd.apple.mpegurl' content_type = 'application/vnd.apple.mpegurl'
@@ -125,3 +144,33 @@ def serve_hls_file(request, filename):
response = HttpResponse(f.read(), content_type=content_type) response = HttpResponse(f.read(), content_type=content_type)
response['Cache-Control'] = 'no-cache' response['Cache-Control'] = 'no-cache'
return response return response
@csrf_exempt
@require_http_methods(["POST"])
def trigger_analysis(request, stream_id):
"""Manually trigger analysis for testing"""
try:
data = json.loads(request.body) if request.body else {}
segment_path = data.get('segment_path')
if not segment_path:
# Find latest segment
media_dir = os.path.join(settings.BASE_DIR.parent.parent, 'media')
ts_files = [f for f in os.listdir(media_dir) if f.endswith('.ts')]
if ts_files:
segment_path = os.path.join(media_dir, ts_files[-1])
else:
return JsonResponse({'error': 'No segments found'}, status=404)
analyzer = VideoAnalyzer()
success = analyzer.queue_segment_analysis(stream_id, segment_path)
if success:
return JsonResponse({'message': 'Analysis triggered', 'segment': segment_path})
else:
return JsonResponse({'error': 'Failed to queue analysis'}, status=500)
except Exception as e:
logger.error(f"Error triggering analysis: {e}")
return JsonResponse({'error': str(e)}, status=500)

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python
"""Quick test script to verify AI pipeline works"""
import os
import django
import sys
from pathlib import Path
# Add the backend directory to Python path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
# Configure Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.processors.video_analyzer import VideoAnalyzer
from ai_processing.models import AnalysisProvider
from PIL import Image
import numpy as np
def test_ai_pipeline():
print("🧪 Testing AI Pipeline...")
# Check providers
providers = AnalysisProvider.objects.all()
print(f"📊 Found {providers.count()} providers:")
for p in providers:
print(f" - {p.name} ({p.provider_type}) - Active: {p.active}")
# Create test analyzer
analyzer = VideoAnalyzer()
# Create a test image (simple colored rectangle)
print("\n🖼️ Creating test image...")
test_image = Image.new('RGB', (640, 480), color='red')
# Test synchronous analysis
print("🔍 Running synchronous analysis...")
try:
result = analyzer.analyze_frame_sync(test_image)
print(f"✅ Analysis result: {result}")
if 'error' in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ Found {len(result.get('logos', []))} logo detections")
for logo in result.get('logos', []):
print(f" - {logo['label']}: {logo['confidence']:.3f}")
except Exception as e:
print(f"❌ Analysis failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_ai_pipeline()

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python
"""Test frontend integration by creating sample analysis data"""
import os
import django
import sys
from pathlib import Path
# Configure Django
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.models import VideoAnalysis, DetectionResult, VisualAnalysis, AnalysisProvider
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def create_sample_analysis():
"""Create sample analysis data for testing frontend"""
print("🎯 Creating sample analysis data...")
# Get CLIP provider
provider = AnalysisProvider.objects.filter(provider_type='local_clip').first()
# Create analysis
analysis = VideoAnalysis.objects.create(
stream_id='test_stream',
segment_path='/fake/path.ts',
provider=provider,
analysis_type='logo_detection',
frame_timestamp=0.0,
confidence_threshold=0.3
)
# Create sample detections
DetectionResult.objects.create(
analysis=analysis,
label='Apple',
confidence=0.85,
bbox_x=0.2,
bbox_y=0.3,
bbox_width=0.3,
bbox_height=0.2,
detection_type='logo'
)
DetectionResult.objects.create(
analysis=analysis,
label='Google',
confidence=0.72,
bbox_x=0.5,
bbox_y=0.1,
bbox_width=0.25,
bbox_height=0.15,
detection_type='logo'
)
# Create visual analysis
VisualAnalysis.objects.create(
analysis=analysis,
dominant_colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
brightness_level=0.7,
contrast_level=0.5,
saturation_level=0.8
)
print(f"✅ Created analysis: {analysis.to_dict()}")
# Try to send via WebSocket
try:
channel_layer = get_channel_layer()
if channel_layer:
async_to_sync(channel_layer.group_send)(
"stream_test_stream",
{
"type": "analysis_update",
"analysis": analysis.to_dict()
}
)
print("📡 Sent WebSocket update")
else:
print("⚠️ No channel layer configured")
except Exception as e:
print(f"❌ WebSocket send failed: {e}")
if __name__ == "__main__":
create_sample_analysis()

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python
"""Test the complete pipeline: AI analysis -> WebSocket -> Frontend"""
import os
import django
import sys
import json
import asyncio
from pathlib import Path
# Add the backend directory to Python path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
# Configure Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.tasks import process_video_segment
from ai_processing.models import VideoAnalysis, AnalysisProvider
from PIL import Image, ImageDraw, ImageFont
import tempfile
def create_test_image_with_apple_logo():
"""Create a test image with 'Apple' text as logo simulation"""
img = Image.new('RGB', (640, 480), color='white')
draw = ImageDraw.Draw(img)
# Draw "Apple" text in the center
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", 48)
except:
font = ImageFont.load_default()
text = "Apple iPhone"
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (640 - text_width) // 2
y = (480 - text_height) // 2
draw.text((x, y), text, fill='black', font=font)
return img
def test_full_pipeline():
print("🧪 Testing Complete Pipeline...")
# Create test image
print("🖼️ Creating test image with Apple logo simulation...")
test_image = create_test_image_with_apple_logo()
# Save to temporary file as a fake video segment
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file:
test_image.save(tmp_file.name)
fake_segment_path = tmp_file.name
print(f"💾 Saved test image to: {fake_segment_path}")
# Test the processing task directly
print("🚀 Triggering analysis task...")
try:
result = process_video_segment('test_stream', fake_segment_path)
print(f"✅ Task result: {result}")
# Check if analysis was stored
analysis = VideoAnalysis.objects.filter(stream_id='test_stream').last()
if analysis:
print(f"📊 Analysis stored: {analysis.to_dict()}")
detections = analysis.detections.all()
print(f"🎯 Found {detections.count()} detections:")
for detection in detections:
print(f" - {detection.label}: {detection.confidence:.3f}")
else:
print("❌ No analysis found in database")
except Exception as e:
print(f"❌ Task failed: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup
try:
os.unlink(fake_segment_path)
except:
pass
if __name__ == "__main__":
test_full_pipeline()

View File

@@ -12,8 +12,17 @@
<div class="viewer-section"> <div class="viewer-section">
<app-stream-viewer <app-stream-viewer
[streamUrl]="selectedStreamUrl"> [streamUrl]="selectedStreamUrl"
[detections]="currentDetections">
</app-stream-viewer> </app-stream-viewer>
</div> </div>
<div class="analysis-section">
<app-analysis-panel
[analyses]="recentAnalyses"
[currentDetections]="currentDetections"
[currentVisual]="currentVisual">
</app-analysis-panel>
</div>
</main> </main>
</div> </div>

View File

@@ -1,22 +1,61 @@
import { Component } from '@angular/core'; import { Component, OnInit, OnDestroy } from '@angular/core';
import { RouterOutlet } from '@angular/router'; import { RouterOutlet } from '@angular/router';
import { HttpClientModule } from '@angular/common/http'; import { HttpClientModule } from '@angular/common/http';
import { StreamControlComponent } from './components/stream-control/stream-control.component'; import { StreamControlComponent } from './components/stream-control/stream-control.component';
import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component'; import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component';
import { AnalysisPanelComponent } from './components/analysis-panel/analysis-panel.component';
import { AnalysisService } from './services/analysis.service';
import { DetectionResult, VisualAnalysis, Analysis } from './models/analysis';
@Component({ @Component({
selector: 'app-root', selector: 'app-root',
standalone: true, standalone: true,
imports: [RouterOutlet, HttpClientModule, StreamControlComponent, StreamViewerComponent], imports: [RouterOutlet, HttpClientModule, StreamControlComponent, StreamViewerComponent, AnalysisPanelComponent],
templateUrl: './app.component.html', templateUrl: './app.component.html',
styleUrl: './app.component.scss' styleUrl: './app.component.scss'
}) })
export class AppComponent { export class AppComponent implements OnInit, OnDestroy {
title = 'Media Analyzer'; title = 'Media Analyzer';
selectedStreamUrl: string = ''; selectedStreamUrl: string = '';
currentStreamId: string = '';
currentDetections: DetectionResult[] = [];
currentVisual?: VisualAnalysis;
recentAnalyses: Analysis[] = [];
constructor(private analysisService: AnalysisService) {}
ngOnInit() {
// Subscribe to analysis updates
this.analysisService.detections$.subscribe(detections => {
this.currentDetections = detections;
});
this.analysisService.visual$.subscribe(visual => {
this.currentVisual = visual || undefined;
});
this.analysisService.analyses$.subscribe(analyses => {
this.recentAnalyses = analyses;
});
}
ngOnDestroy() {
this.analysisService.disconnect();
}
onStreamSelected(streamUrl: string) { onStreamSelected(streamUrl: string) {
console.log('App received stream URL:', streamUrl); console.log('App received stream URL:', streamUrl);
this.selectedStreamUrl = streamUrl; this.selectedStreamUrl = streamUrl;
// Extract stream ID from URL: /streaming/hls/43606ec7-786c-4f7d-acf3-95981f9e5ebe.m3u8
const streamIdMatch = streamUrl.match(/hls\/([0-9a-f-]+)\.m3u8/);
if (streamIdMatch) {
this.currentStreamId = streamIdMatch[1];
console.log('Extracted stream ID:', this.currentStreamId);
// Connect to WebSocket for this stream
this.analysisService.connectToStream(this.currentStreamId);
} else {
console.error('Could not extract stream ID from URL:', streamUrl);
}
} }
} }

View File

@@ -1 +1,68 @@
<p>analysis-panel works!</p> <div class="analysis-panel">
<h3>Live Analysis Results</h3>
@if (currentVisual) {
<div class="visual-analysis">
<h4>Visual Properties</h4>
<div class="property">
<span>Brightness:</span>
<div class="brightness-bar">
<div class="brightness-fill" [style.width.%]="currentVisual.brightness_level * 100"></div>
</div>
<span>{{(currentVisual.brightness_level * 100) | number:'1.0-0'}}%</span>
</div>
@if (currentVisual.dominant_colors && currentVisual.dominant_colors.length > 0) {
<div class="property">
<span>Colors:</span>
<div class="color-palette">
@for (color of currentVisual.dominant_colors; track $index) {
<div class="color-swatch" [style.background]="getColorStyle([color])"></div>
}
</div>
</div>
}
</div>
}
@if (getDetectionsByType('logo').length > 0) {
<div class="detection-section">
<h4>🏷️ Logos Detected</h4>
@for (detection of getDetectionsByType('logo'); track detection.id) {
<div class="detection-item logo">
<span class="label">{{detection.label}}</span>
<span class="confidence">{{(detection.confidence * 100) | number:'1.0-0'}}%</span>
</div>
}
</div>
}
@if (getDetectionsByType('object').length > 0) {
<div class="detection-section">
<h4>📦 Objects Detected</h4>
@for (detection of getDetectionsByType('object'); track detection.id) {
<div class="detection-item object">
<span class="label">{{detection.label}}</span>
<span class="confidence">{{(detection.confidence * 100) | number:'1.0-0'}}%</span>
</div>
}
</div>
}
@if (getDetectionsByType('text').length > 0) {
<div class="detection-section">
<h4>📝 Text Detected</h4>
@for (detection of getDetectionsByType('text'); track detection.id) {
<div class="detection-item text">
<span class="label">{{detection.label}}</span>
</div>
}
</div>
}
@if (currentDetections.length === 0) {
<div class="no-detections">
<p>No detections in current frame</p>
</div>
}
</div>

View File

@@ -0,0 +1,104 @@
.analysis-panel {
padding: 1rem;
background: #f8f9fa;
border-radius: 8px;
min-width: 300px;
h3 {
margin: 0 0 1rem 0;
color: #333;
}
h4 {
margin: 0 0 0.5rem 0;
font-size: 1rem;
color: #555;
}
}
.visual-analysis {
margin-bottom: 1.5rem;
.property {
display: flex;
align-items: center;
gap: 0.5rem;
margin-bottom: 0.5rem;
span {
min-width: 80px;
font-size: 0.9rem;
}
}
.brightness-bar {
flex: 1;
height: 20px;
background: #ddd;
border-radius: 10px;
overflow: hidden;
.brightness-fill {
height: 100%;
background: linear-gradient(to right, #000, #fff);
transition: width 0.3s ease;
}
}
.color-palette {
display: flex;
gap: 4px;
.color-swatch {
width: 20px;
height: 20px;
border-radius: 50%;
border: 1px solid #ccc;
}
}
}
.detection-section {
margin-bottom: 1rem;
.detection-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 0.5rem;
margin-bottom: 0.25rem;
border-radius: 4px;
font-size: 0.9rem;
&.logo {
background: #ffe6e6;
border-left: 4px solid #ff0000;
}
&.object {
background: #e6ffe6;
border-left: 4px solid #00ff00;
}
&.text {
background: #e6e6ff;
border-left: 4px solid #0000ff;
}
.label {
font-weight: 500;
}
.confidence {
font-size: 0.8rem;
color: #666;
}
}
}
.no-detections {
text-align: center;
color: #999;
font-style: italic;
margin: 2rem 0;
}

View File

@@ -1,12 +1,26 @@
import { Component } from '@angular/core'; import { Component, Input } from '@angular/core';
import { CommonModule } from '@angular/common';
import { Analysis, DetectionResult, VisualAnalysis } from '../../models/analysis';
@Component({ @Component({
selector: 'app-analysis-panel', selector: 'app-analysis-panel',
standalone: true, standalone: true,
imports: [], imports: [CommonModule],
templateUrl: './analysis-panel.component.html', templateUrl: './analysis-panel.component.html',
styleUrl: './analysis-panel.component.scss' styleUrl: './analysis-panel.component.scss'
}) })
export class AnalysisPanelComponent { export class AnalysisPanelComponent {
@Input() analyses: Analysis[] = [];
@Input() currentDetections: DetectionResult[] = [];
@Input() currentVisual?: VisualAnalysis;
getDetectionsByType(type: string): DetectionResult[] {
return this.currentDetections.filter(d => d.detection_type === type);
}
getColorStyle(colors: number[][]): string {
if (!colors || colors.length === 0) return '';
const [r, g, b] = colors[0];
return `rgb(${r}, ${g}, ${b})`;
}
} }

View File

@@ -1,10 +1,14 @@
<div class="stream-viewer"> <div class="stream-viewer">
<div class="video-container">
<video #video controls muted class="video-player"> <video #video controls muted class="video-player">
Your browser does not support video playback. Your browser does not support video playback.
</video> </video>
<canvas #overlay class="detection-overlay"></canvas>
</div>
<div class="controls"> <div class="controls">
<button (click)="play()" class="btn">Play</button> <button (click)="play()" class="btn">Play</button>
<button (click)="pause()" class="btn">Pause</button> <button (click)="pause()" class="btn">Pause</button>
<button (click)="toggleOverlay()" class="btn">{{showOverlay ? 'Hide' : 'Show'}} Detections</button>
</div> </div>
</div> </div>

View File

@@ -3,11 +3,26 @@
max-width: 800px; max-width: 800px;
margin: 0 auto; margin: 0 auto;
.video-container {
position: relative;
display: inline-block;
width: 100%;
}
.video-player { .video-player {
width: 100%; width: 100%;
height: auto; height: auto;
background: #000; background: #000;
border-radius: 8px; border-radius: 8px;
display: block;
}
.detection-overlay {
position: absolute;
top: 0;
left: 0;
pointer-events: none;
border-radius: 8px;
} }
.controls { .controls {

View File

@@ -1,5 +1,6 @@
import { Component, ElementRef, Input, ViewChild, AfterViewInit, OnDestroy, OnChanges } from '@angular/core'; import { Component, ElementRef, Input, ViewChild, AfterViewInit, OnDestroy, OnChanges } from '@angular/core';
import Hls from 'hls.js'; import Hls from 'hls.js';
import { Analysis, DetectionResult } from '../../models/analysis';
@Component({ @Component({
selector: 'app-stream-viewer', selector: 'app-stream-viewer',
@@ -10,22 +11,30 @@ import Hls from 'hls.js';
}) })
export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChanges { export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChanges {
@ViewChild('video', { static: true }) videoElement!: ElementRef<HTMLVideoElement>; @ViewChild('video', { static: true }) videoElement!: ElementRef<HTMLVideoElement>;
@ViewChild('overlay', { static: true }) overlayElement!: ElementRef<HTMLCanvasElement>;
@Input() streamUrl: string = ''; @Input() streamUrl: string = '';
@Input() detections: DetectionResult[] = [];
showOverlay = true;
private hls?: Hls; private hls?: Hls;
private ctx?: CanvasRenderingContext2D;
ngAfterViewInit() { ngAfterViewInit() {
console.log('StreamViewer initialized with URL:', this.streamUrl); this.setupCanvas();
if (this.streamUrl) { if (this.streamUrl) {
this.loadStream(this.streamUrl); this.loadStream(this.streamUrl);
} }
} }
ngOnChanges() { ngOnChanges() {
console.log('StreamViewer URL changed to:', this.streamUrl);
if (this.streamUrl && this.videoElement) { if (this.streamUrl && this.videoElement) {
this.loadStream(this.streamUrl); this.loadStream(this.streamUrl);
} }
// Redraw detections when they change
if (this.ctx) {
this.drawDetections();
}
} }
ngOnDestroy() { ngOnDestroy() {
@@ -44,7 +53,7 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
if (Hls.isSupported()) { if (Hls.isSupported()) {
this.hls = new Hls({ this.hls = new Hls({
debug: true, // Enable debug logging debug: false,
enableWorker: false enableWorker: false
}); });
@@ -93,4 +102,89 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
console.error('Pause error:', error); console.error('Pause error:', error);
} }
} }
toggleOverlay() {
this.showOverlay = !this.showOverlay;
if (this.showOverlay) {
this.drawDetections();
} else {
this.clearOverlay();
}
}
private setupCanvas() {
const video = this.videoElement.nativeElement;
const canvas = this.overlayElement.nativeElement;
this.ctx = canvas.getContext('2d')!;
// Sync canvas size with video
const resizeCanvas = () => {
canvas.width = video.clientWidth;
canvas.height = video.clientHeight;
canvas.style.width = video.clientWidth + 'px';
canvas.style.height = video.clientHeight + 'px';
this.drawDetections();
};
video.addEventListener('loadedmetadata', resizeCanvas);
video.addEventListener('resize', resizeCanvas);
window.addEventListener('resize', resizeCanvas);
}
private drawDetections() {
if (!this.ctx || !this.showOverlay) return;
this.clearOverlay();
const canvas = this.overlayElement.nativeElement;
const video = this.videoElement.nativeElement;
// Draw each detection
this.detections.forEach(detection => {
this.drawDetection(detection, canvas.width, canvas.height);
});
}
private drawDetection(detection: DetectionResult, canvasWidth: number, canvasHeight: number) {
if (!this.ctx) return;
// Convert normalized bbox to canvas coordinates
const x = detection.bbox.x * canvasWidth;
const y = detection.bbox.y * canvasHeight;
const width = detection.bbox.width * canvasWidth;
const height = detection.bbox.height * canvasHeight;
// Color by detection type
const colors = {
'object': '#00ff00',
'logo': '#ff0000',
'text': '#0000ff'
};
const color = colors[detection.detection_type] || '#ffffff';
// Draw bounding box
this.ctx.strokeStyle = color;
this.ctx.lineWidth = 2;
this.ctx.strokeRect(x, y, width, height);
// Draw label
this.ctx.fillStyle = color;
this.ctx.font = '14px Arial';
const label = `${detection.label} (${Math.round(detection.confidence * 100)}%)`;
const textWidth = this.ctx.measureText(label).width;
// Background for text
this.ctx.fillStyle = 'rgba(0, 0, 0, 0.7)';
this.ctx.fillRect(x, y - 20, textWidth + 8, 20);
// Text
this.ctx.fillStyle = color;
this.ctx.fillText(label, x + 4, y - 6);
}
private clearOverlay() {
if (!this.ctx) return;
const canvas = this.overlayElement.nativeElement;
this.ctx.clearRect(0, 0, canvas.width, canvas.height);
}
} }

View File

@@ -1,2 +1,36 @@
export interface Analysis { export interface BoundingBox {
x: number;
y: number;
width: number;
height: number;
}
export interface DetectionResult {
id: string;
label: string;
confidence: number;
bbox: BoundingBox;
detection_type: 'object' | 'logo' | 'text';
metadata?: any;
}
export interface VisualAnalysis {
dominant_colors: number[][];
brightness_level: number;
contrast_level?: number;
saturation_level?: number;
activity_score?: number;
scene_description?: string;
}
export interface Analysis {
id: string;
stream_id: string;
timestamp: string;
processing_time?: number;
analysis_type: string;
frame_timestamp: number;
provider: string;
detections: DetectionResult[];
visual?: VisualAnalysis;
} }

View File

@@ -1,9 +1,74 @@
import { Injectable } from '@angular/core'; import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { WebsocketService } from './websocket.service';
import { Analysis, DetectionResult, VisualAnalysis } from '../models/analysis';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
}) })
export class AnalysisService { export class AnalysisService {
private currentDetections = new BehaviorSubject<DetectionResult[]>([]);
private currentVisual = new BehaviorSubject<VisualAnalysis | null>(null);
private recentAnalyses = new BehaviorSubject<Analysis[]>([]);
constructor() { } public detections$ = this.currentDetections.asObservable();
public visual$ = this.currentVisual.asObservable();
public analyses$ = this.recentAnalyses.asObservable();
constructor(private websocketService: WebsocketService) {
// Subscribe to WebSocket analysis updates
this.websocketService.analysis$.subscribe(analysis => {
this.handleAnalysisUpdate(analysis);
});
}
connectToStream(streamId: string) {
this.websocketService.connect(streamId);
}
disconnect() {
this.websocketService.disconnect();
this.currentDetections.next([]);
this.currentVisual.next(null);
}
private handleAnalysisUpdate(analysis: Analysis) {
// Update recent analyses list
const current = this.recentAnalyses.value;
const updated = [analysis, ...current.slice(0, 9)]; // Keep last 10
this.recentAnalyses.next(updated);
// Update current detections (latest analysis only)
const detections = analysis.detections || [];
this.currentDetections.next(detections);
// Update visual analysis
if (analysis.visual) {
this.currentVisual.next(analysis.visual);
}
console.log('Analysis update:', {
detections: detections.length,
visual: !!analysis.visual,
timestamp: analysis.timestamp
});
}
getCurrentDetections(): DetectionResult[] {
return this.currentDetections.value;
}
getCurrentVisual(): VisualAnalysis | null {
return this.currentVisual.value;
}
getDetectionsByType(type: string): DetectionResult[] {
return this.currentDetections.value.filter(d => d.detection_type === type);
}
clearAnalysis() {
this.currentDetections.next([]);
this.currentVisual.next(null);
this.recentAnalyses.next([]);
}
} }

View File

@@ -1,9 +1,81 @@
import { Injectable } from '@angular/core'; import { Injectable } from '@angular/core';
import { Subject, Observable, BehaviorSubject } from 'rxjs';
import { Analysis } from '../models/analysis';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
}) })
export class WebsocketService { export class WebsocketService {
private socket?: WebSocket;
private analysisSubject = new Subject<Analysis>();
private connectionStatus = new BehaviorSubject<boolean>(false);
public analysis$ = this.analysisSubject.asObservable();
public connectionStatus$ = this.connectionStatus.asObservable();
constructor() { } constructor() { }
connect(streamId: string) {
if (this.socket?.readyState === WebSocket.OPEN) {
return;
}
const wsUrl = `ws://localhost:8000/ws/stream/${streamId}/`;
console.log('Connecting to WebSocket:', wsUrl);
this.socket = new WebSocket(wsUrl);
this.socket.onopen = () => {
console.log('WebSocket connected');
this.connectionStatus.next(true);
};
this.socket.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
console.log('WebSocket message:', data);
if (data.type === 'analysis_update') {
this.analysisSubject.next(data.analysis);
} else if (data.type === 'recent_analysis') {
data.analyses.forEach((analysis: Analysis) => {
this.analysisSubject.next(analysis);
});
}
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
};
this.socket.onclose = () => {
console.log('WebSocket disconnected');
this.connectionStatus.next(false);
};
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
this.connectionStatus.next(false);
};
}
disconnect() {
if (this.socket) {
this.socket.close();
this.socket = undefined;
this.connectionStatus.next(false);
}
}
send(message: any) {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
}
ping() {
this.send({
type: 'ping',
timestamp: Date.now()
});
}
} }

61
progress.md Normal file
View File

@@ -0,0 +1,61 @@
┌─────────────────────────┬──────────────────────────────────┬──────────────────────────────────┬──────────────────────────────────┐
│ FEATURES \ PHASES │ 📋 Definitions/Infra/Bootstrap │ 🔧 Local Single Server │ 🚀 Production Event-Ready │
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
│ 🎥 Stream Ingestion │ Backend: │ Backend: │ Backend: │
│ │ [x] Django streaming app │ [x] RTMP endpoint accepting OBS │ [ ] WebSocket stream status │
│ │ [x] RTMP server models │ [x] FFmpeg HLS conversion │ [ ] Redis-backed stream queue │
│ │ [x] FFmpeg integration setup │ [x] Video segment serving │ [ ] Multi-stream handling │
│ │ [x] HLS file management │ [x] Stream status API │ [ ] Auto-scaling triggers │
│ │ │ │ │
│ │ Frontend: │ Frontend: │ Frontend: │
│ │ [x] Angular stream components │ [x] Working video player HLS.js │ [ ] Real-time stream updates │
│ │ [x] Video player service │ [x] Stream selection dropdown │ [ ] Multi-viewer support │
│ │ [x] Stream control UI │ [x] Basic stream controls │ [ ] Reconnection handling │
│ │ │ [x] Connection status display │ [ ] Progressive stream quality │
│ │ │ │ │
│ │ Data/Config: │ Data/Config: │ Data/Config: │
│ │ [x] Stream model schema │ [x] Local media storage │ [ ] K8s persistent volumes │
│ │ [x] Docker services setup │ [x] Docker compose integration │ [ ] Load balancer configuration │
│ │ [ ] K8s base manifests │ [x] Development URLs configured │ [ ] Stream metrics monitoring │
│ │ │ [x] Basic error handling │ [ ] CDN integration ready │
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
│ 🤖 AI Analysis │ Backend: │ Backend: │ Backend: │
│ │ [x] Django ai_processing app │ [ ] Frame extraction from HLS │ [ ] Distributed processing work │
│ │ [ ] Video analysis models │ [ ] YOLO object detection │ [ ] WebSocket analysis streaming │
│ │ [ ] YOLO/CLIP model loading │ [ ] Analysis results storage │ [ ] Redis result caching │
│ │ [x] Processing strategy pattern │ [ ] Results API endpoint │ [ ] Batch vs real-time modes │
│ │ │ │ │
│ │ Frontend: │ Frontend: │ Frontend: │
│ │ [x] Analysis display components │ [ ] Detection overlay on video │ [ ] Live analysis updates │
│ │ [ ] Results visualization svc │ [ ] Results panel display │ [ ] Analysis history timeline │
│ │ [ ] Detection overlay system │ [ ] Analysis trigger controls │ [ ] Performance dashboards │
│ │ │ [ ] Object list and filtering │ [ ] Multi-stream analysis view │
│ │ │ │ │
│ │ Data/Config: │ Data/Config: │ Data/Config: │
│ │ [ ] Analysis results model │ [ ] Local model storage │ [ ] K8s GPU node pools │
│ │ [ ] Object detection schema │ [ ] Processing queue setup │ [ ] Analysis result streaming │
│ │ [ ] AI model configurations │ [ ] Result data persistence │ [ ] Model versioning system │
│ │ [ ] Celery task definitions │ [ ] Basic performance metrics │ [ ] Cloud storage integration │
├─────────────────────────┼──────────────────────────────────┼──────────────────────────────────┼──────────────────────────────────┤
│ 🎨 Effects Pipeline │ Backend: │ Backend: │ Backend: │
│ │ [x] Django effects app │ [ ] Basic shader pipeline │ [ ] GPU cluster scheduling │
│ │ [ ] Shader management system │ [ ] AI-triggered blur effect │ [ ] Real-time effect streaming │
│ │ [ ] GPU pipeline setup │ [ ] Effect parameter API │ [ ] Effect composition pipeline │
│ │ [ ] Effect trigger engine │ [ ] Manual effect controls │ [ ] Cloud-based rendering │
│ │ │ │ │
│ │ Frontend: │ Frontend: │ Frontend: │
│ │ [x] Effects control panel │ [ ] Effect selection UI │ [ ] Live effect synchronization │
│ │ [x] WebGL shader service │ [ ] WebGL overlay canvas │ [ ] Multi-layer composition │
│ │ [ ] Effect preview system │ [ ] Real-time parameter sliders │ [ ] Effect performance monitor │
│ │ │ [ ] Effect preview mode │ [ ] Collaborative effect edit │
│ │ │ │ │
│ │ Data/Config: │ Data/Config: │ Data/Config: │
│ │ [ ] Effect preset models │ [ ] Local shader compilation │ [ ] GPU resource allocation │
│ │ [x] Shader file organization │ [ ] Effect state management │ [ ] Effect state distribution │
│ │ [ ] GPU resource configs │ [ ] Basic GPU utilization │ [ ] WebGL performance optimize │
│ │ [ ] WebGL asset loading │ [ ] Effect synchronization │ [ ] Effect marketplace ready │
└─────────────────────────┴──────────────────────────────────┴──────────────────────────────────┴──────────────────────────────────┘