improve frontend architecture and websocket communication

This commit is contained in:
buenosairesam
2025-08-25 04:02:24 -03:00
parent ad2b1a8a8f
commit 1cd999f825
40 changed files with 2371 additions and 203 deletions

4
.gitignore vendored
View File

@@ -7,4 +7,6 @@ postgres_data
media/* media/*
!media/.gitkeep !media/.gitkeep
media-analyzer-src.tar.gz media-analyzer-src.tar.gz
media-analyzer/media media-analyzer/media
.env

View File

@@ -0,0 +1,175 @@
dont asume is that do all this checks you mentioned 1. FFmpeg processes not being properly terminated - This is a common cause of memory leaks in video streaming
2. Multiple FFmpeg instances running simultaneously - If the stop/start process doesn't properly kill previous instances
3. Celery workers accumulating memory - If video analysis tasks are not properly cleaning up
4. Video frames not being released from memory - In the analysis engine
5. WebSocket connections not being properly closed
● You're absolutely right - I shouldn't assume. Let me systematically check each of these potential causes:
● Bash(ps aux | grep ffmpeg)
⎿ mariano 1274947 0.0 0.0 7196 3400 ? Ss 04:52 0:00 /bin/bash -c -l source /home/mariano/.claude/shell-snapshots/snapshot-bash-1755846619221-6k5rzy.sh && eval 'ps aux < /dev/null | grep f
fmpeg' && pwd -P >| /tmp/claude-9915-cwd
mariano 1274969 0.0 0.0 6608 2260 ? S 04:52 0:00 grep ffmpeg
● Bash(docker compose exec backend ps aux | grep -E "(ffmpeg|celery)")
 Error: Error
● Bash(docker compose ps)
 NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
media-analyzer-backend-1 media-analyzer-backend "./entrypoint-web.sh" backend 4 minutes ago Up 4 minutes 0.0.0.0:8000->8000/tcp, [::]:8000->8000/tcp
media-analyzer-celery-default-1 media-analyzer-celery-default "./entrypoint-celery…" celery-default 4 minutes ago Up 4 minutes 8000/tcp
… +5 lines (ctrl+r to expand)
● Bash(docker compose exec backend ps aux)
 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
1000 1 0.9 0.1 104196 26976 ? Ssl 07:48 0:02 /usr/local/bin/python3.11 /usr/local/bin/uvicorn media_analyzer.asgi:application --host 0.0.0.0 --port 8000 --reload
1000 40 0.0 0.0 16400 12416 ? S 07:48 0:00 /usr/local/bin/python3.11 -c from multiprocessing.resource_tracker import main;main(4)
… +3 lines (ctrl+r to expand)
● Bash(docker compose exec celery-logo ps aux)
 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
appuser 1 0.7 0.6 855508 109060 ? Ss 07:48 0:02 /usr/local/bin/python3.11 /usr/local/bin/celery -A media_analyzer worker -l info --queues=logo_detection --hostname=logo-worker@%h
appuser 24 1.6 8.0 7534128 1315524 ? Sl 07:48 0:04 /usr/local/bin/python3.11 /usr/local/bin/celery -A media_analyzer worker -l info --queues=logo_detection --hostname=logo-worker@%h
… +16 lines (ctrl+r to expand)
● FOUND THE SMOKING GUN! 🔥
Look at the Celery logo worker:
- PID 39: Using 17.5% memory (2.8GB) and 23.7% CPU
- PID 24: Using 8.0% memory (1.3GB)
That's a massive memory leak in the Celery logo detection worker! The workers are accumulating memory and not releasing it.
Let me check the celery-default worker too:
● Bash(docker compose exec celery-default ps aux)
⎿ USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
appuser 1 0.7 0.6 855452 109260 ? Ss 07:48 0:02 /usr/local/bin/python3.11 /usr/local/bin/celery -A media_analyzer worker -l info --queues=default,config_management --hostname=default-
worker@%h
… +33 lines (ctrl+r to expand)
● Perfect! This confirms it's specifically the logo detection Celery worker causing the memory leak. The default worker is using normal ~70MB per process, but the logo worker is consuming 2.8GB and growing.
Let me check what's in the analysis engine that's not being released:
● Read(backend/ai_processing/analysis_engine.py)
 Read 50 lines (ctrl+r to expand)
● The analysis engine holds these detectors as instance variables. Let me check the logo detection adapter that's likely holding onto model memory:
● Read(backend/ai_processing/adapters/logo_detection.py)
 Read 80 lines (ctrl+r to expand)
● FOUND IT! 🎯
The CLIPLogoDetectionAdapter loads the CLIP model in _load_model() at lines 65-69, but never unloads it. Every time the Celery worker processes a video segment:
1. Creates a new AnalysisEngine instance
2. Which creates a new CLIPLogoDetectionAdapter
3. Which loads the CLIP model into memory (~500MB+)
4. Never releases the model after processing
Since the Celery worker is persistent, each task accumulates more model instances in memory. That's why we see 2.8GB+ memory usage!
The fix is to either:
1. Make the model singleton/shared across tasks
2. Explicitly clean up the model after each task
3. Use worker max tasks to restart workers periodically
Let me check the rest of the CLIP adapter to see the complete picture:
● Read(backend/ai_processing/adapters/logo_detection.py)
 Read 52 lines (ctrl+r to expand)
● Perfect! I can see the complete problem. The CLIPLogoDetectionAdapter:
1. Lines 67-69: Loads heavy CLIP model and processor into memory
2. Lines 94-99: Runs inference with torch tensors
3. No cleanup method - never releases the model, processor, or GPU memory
Solution: Add proper cleanup and memory management
Let me implement the fix by adding cleanup methods and ensuring models are released after each task:
● Update(backend/ai_processing/adapters/logo_detection.py)
 Updated backend/ai_processing/adapters/logo_detection.py with 15 additions
97 with torch.no_grad():
98 outputs = self.model(**inputs)
99 probs = outputs.logits_per_image.softmax(dim=1)
100 +
101 + # Clear GPU cache immediately after inference
102 + if torch.cuda.is_available():
103 + torch.cuda.empty_cache()
104 +
105 + # Clear input tensors
106 + del inputs
107 + del outputs
108
109 results = []
110 for i, prob in enumerate(probs[0][:-1]):
...
108 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1} # Full frame for CLIP
109 })
110
111 + # Clear probability tensors
112 + del probs
113 +
114 return sorted(results, key=lambda x: x['confidence'], reverse=True)[:5]
115
116 except Exception as e:
117 logger.error(f"CLIP logo detection error: {e}")
118 return []
119 + finally:
120 + # Force garbage collection after processing
121 + import gc
122 + gc.collect()
123
124
125 class LogoDetectionAdapterFactory(AdapterFactory):
● Now I need to add a cleanup method to properly release the model and add task-level cleanup. Let me add the cleanup method:
> why it happened only on webcam?
● Great question! The memory leak affected all logo detection tasks, but it was only noticeable with webcam because of the frequency difference:
Frequency Analysis 📊
Webcam Stream:
- Continuous 30fps video → HLS segments every ~2-6 seconds
- Each .ts segment triggers logo detection task
- With webcam running: ~10-30 logo detection tasks per minute
- Memory accumulation: 500MB × 30 tasks/min = 15GB/hour
RTMP Stream:
- Only runs when you manually stream from OBS
- You probably tested RTMP for shorter periods
- Intermittent usage = fewer tasks = slower memory accumulation
- Memory leak was still happening, just not reaching critical levels
Why Webcam Exposed It
1. Always-on nature: Webcam starts automatically and runs continuously
2. High segment frequency: More .ts files = more Celery tasks = faster leak
3. Sustained load: No breaks between segments like manual RTMP testing
4. System saturation: 2.8GB+ per worker × multiple workers = system choking
The Real Pattern
RTMP: Task → 500MB leak → (gap) → Task → 500MB leak → (gap)
Webcam: Task → Task → Task → Task → Task → 15GB accumulated → 💥
RTMP seemed fine because:
- You'd stream for 30 seconds, test, then stop
- Memory never reached critical mass
- System had time to recover between sessions
Webcam revealed the truth because:
- Continuous operation exposed the cumulative effect
- No recovery time between tasks
- Memory leak compounded until system became unresponsive
The fix I implemented (tensor cleanup, model release, garbage collection) benefits both RTMP and webcam - you just noticed it with webcam because that's where the leak reached critical mass first!

View File

@@ -68,6 +68,21 @@ class CLIPLogoDetectionAdapter(DetectionAdapter):
self.model = CLIPModel.from_pretrained(self.model_identifier) self.model = CLIPModel.from_pretrained(self.model_identifier)
self.processor = CLIPProcessor.from_pretrained(self.model_identifier) self.processor = CLIPProcessor.from_pretrained(self.model_identifier)
def cleanup(self):
"""Release model and processor memory"""
if self.model:
del self.model
self.model = None
if self.processor:
del self.processor
self.processor = None
import torch
import gc
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
def detect(self, image, confidence_threshold=0.5): def detect(self, image, confidence_threshold=0.5):
try: try:
self._load_model() self._load_model()
@@ -97,6 +112,14 @@ class CLIPLogoDetectionAdapter(DetectionAdapter):
with torch.no_grad(): with torch.no_grad():
outputs = self.model(**inputs) outputs = self.model(**inputs)
probs = outputs.logits_per_image.softmax(dim=1) probs = outputs.logits_per_image.softmax(dim=1)
# Clear GPU cache immediately after inference
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Clear input tensors
del inputs
del outputs
results = [] results = []
for i, prob in enumerate(probs[0][:-1]): for i, prob in enumerate(probs[0][:-1]):
@@ -108,11 +131,18 @@ class CLIPLogoDetectionAdapter(DetectionAdapter):
'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1} # Full frame for CLIP 'bbox': {'x': 0, 'y': 0, 'width': 1, 'height': 1} # Full frame for CLIP
}) })
# Clear probability tensors
del probs
return sorted(results, key=lambda x: x['confidence'], reverse=True)[:5] return sorted(results, key=lambda x: x['confidence'], reverse=True)[:5]
except Exception as e: except Exception as e:
logger.error(f"CLIP logo detection error: {e}") logger.error(f"CLIP logo detection error: {e}")
return [] return []
finally:
# Force garbage collection after processing
import gc
gc.collect()
class LogoDetectionAdapterFactory(AdapterFactory): class LogoDetectionAdapterFactory(AdapterFactory):

View File

@@ -113,36 +113,59 @@ class AnalysisEngine:
"""Analyze a single frame using configured adapters and execution strategy""" """Analyze a single frame using configured adapters and execution strategy"""
results = {} results = {}
# Adapter execution map try:
adapter_map = { # Adapter execution map
'object_detection': self.object_detector, adapter_map = {
'logo_detection': self.logo_detector, 'object_detection': self.object_detector,
'text_detection': self.text_detector 'logo_detection': self.logo_detector,
} 'text_detection': self.text_detector
}
# Execute detection using strategy
for analysis_type in requested_analysis:
if analysis_type in adapter_map and adapter_map[analysis_type]:
detections = self.execution_strategy.execute_detection(
adapter_map[analysis_type],
image,
confidence_threshold
)
# Map to expected result format
result_key = {
'object_detection': 'objects',
'logo_detection': 'logos',
'text_detection': 'text'
}.get(analysis_type, analysis_type)
results[result_key] = detections
# Visual properties (always computed locally)
if 'visual_analysis' in requested_analysis:
results['visual'] = self._analyze_visual_properties(image)
return results # Execute detection using strategy
for analysis_type in requested_analysis:
if analysis_type in adapter_map and adapter_map[analysis_type]:
detections = self.execution_strategy.execute_detection(
adapter_map[analysis_type],
image,
confidence_threshold
)
# Map to expected result format
result_key = {
'object_detection': 'objects',
'logo_detection': 'logos',
'text_detection': 'text'
}.get(analysis_type, analysis_type)
results[result_key] = detections
# Visual properties (always computed locally)
if 'visual_analysis' in requested_analysis:
results['visual'] = self._analyze_visual_properties(image)
return results
finally:
# Clean up models after each analysis to prevent memory leaks
self.cleanup()
def cleanup(self):
"""Clean up all models and release memory"""
try:
if self.logo_detector and hasattr(self.logo_detector, 'cleanup'):
self.logo_detector.cleanup()
if self.object_detector and hasattr(self.object_detector, 'cleanup'):
self.object_detector.cleanup()
if self.text_detector and hasattr(self.text_detector, 'cleanup'):
self.text_detector.cleanup()
if self.motion_analyzer and hasattr(self.motion_analyzer, 'cleanup'):
self.motion_analyzer.cleanup()
# Force garbage collection
import gc
gc.collect()
except Exception as e:
logger.error(f"Cleanup error: {e}")
def health_check(self): def health_check(self):
"""Check health of execution strategy and configured adapters""" """Check health of execution strategy and configured adapters"""

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2025-08-22 07:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ai_processing', '0004_remove_processingqueue_ai_processi_stream__60b035_idx_and_more'),
]
operations = [
migrations.AddField(
model_name='videoanalysis',
name='session_id',
field=models.CharField(blank=True, max_length=100, null=True),
),
]

View File

@@ -38,6 +38,7 @@ class Brand(models.Model):
class VideoAnalysis(models.Model): class VideoAnalysis(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
stream_key = models.CharField(max_length=100) # Use stream_key instead of stream_id stream_key = models.CharField(max_length=100) # Use stream_key instead of stream_id
session_id = models.CharField(max_length=100, null=True, blank=True) # Frontend session tracking
segment_path = models.CharField(max_length=500) segment_path = models.CharField(max_length=500)
timestamp = models.DateTimeField(auto_now_add=True) timestamp = models.DateTimeField(auto_now_add=True)
processing_time = models.FloatField(null=True) processing_time = models.FloatField(null=True)
@@ -50,7 +51,8 @@ class VideoAnalysis(models.Model):
def to_dict(self): def to_dict(self):
return { return {
'id': str(self.id), 'id': str(self.id),
'stream_key': self.stream_key, 'stream_id': self.stream_key, # Frontend expects 'stream_id'
'session_id': self.session_id,
'timestamp': self.timestamp.isoformat(), 'timestamp': self.timestamp.isoformat(),
'processing_time': self.processing_time, 'processing_time': self.processing_time,
'analysis_type': self.analysis_type, 'analysis_type': self.analysis_type,

View File

@@ -50,7 +50,7 @@ class VideoAnalyzer:
except Exception as e: except Exception as e:
logger.error(f"Error setting up providers: {e}") logger.error(f"Error setting up providers: {e}")
def queue_segment_analysis(self, stream_key, segment_path): def queue_segment_analysis(self, stream_key, segment_path, session_id=None):
"""Queue video segment for analysis""" """Queue video segment for analysis"""
try: try:
# Check if already queued # Check if already queued
@@ -73,7 +73,7 @@ class VideoAnalyzer:
) )
# Trigger async processing # Trigger async processing
process_video_segment.delay(stream_key, segment_path) process_video_segment.delay(stream_key, segment_path, session_id)
logger.info(f"Queued segment for analysis: {segment_path}") logger.info(f"Queued segment for analysis: {segment_path}")
return True return True

View File

@@ -11,7 +11,7 @@ channel_layer = get_channel_layer()
@shared_task(bind=True, queue='logo_detection') @shared_task(bind=True, queue='logo_detection')
def analyze_logo_detection(self, stream_key, segment_path): def analyze_logo_detection(self, stream_key, segment_path, session_id=None):
"""Dedicated task for logo detection analysis""" """Dedicated task for logo detection analysis"""
queue_item = None queue_item = None
try: try:
@@ -61,6 +61,7 @@ def analyze_logo_detection(self, stream_key, segment_path):
analysis = VideoAnalysis.objects.create( analysis = VideoAnalysis.objects.create(
stream_key=stream_key, stream_key=stream_key,
session_id=session_id,
segment_path=segment_path, segment_path=segment_path,
provider=provider, provider=provider,
analysis_type='logo_detection', analysis_type='logo_detection',
@@ -83,17 +84,16 @@ def analyze_logo_detection(self, stream_key, segment_path):
) )
detections.append(detection.to_dict()) detections.append(detection.to_dict())
# Send results via WebSocket if detections found # Send results via WebSocket (always send, even with 0 detections)
if detections: websocket_group = f"stream_{stream_key}"
websocket_group = f"stream_{stream_key}" logger.info(f"Sending websocket update to group: {websocket_group} - detections: {len(detections)}")
logger.info(f"Sending websocket update to group: {websocket_group}") async_to_sync(channel_layer.group_send)(
async_to_sync(channel_layer.group_send)( websocket_group,
websocket_group, {
{ "type": "analysis_update",
"type": "analysis_update", "analysis": analysis.to_dict()
"analysis": analysis.to_dict() }
} )
)
# Update queue status # Update queue status
if queue_item: if queue_item:
@@ -117,7 +117,7 @@ def analyze_logo_detection(self, stream_key, segment_path):
@shared_task(bind=True, queue='visual_analysis') @shared_task(bind=True, queue='visual_analysis')
def analyze_visual_properties(self, stream_key, segment_path): def analyze_visual_properties(self, stream_key, segment_path, session_id=None):
"""Dedicated task for visual property analysis""" """Dedicated task for visual property analysis"""
queue_item = None queue_item = None
try: try:
@@ -151,6 +151,7 @@ def analyze_visual_properties(self, stream_key, segment_path):
# Store results (no provider needed for local visual analysis) # Store results (no provider needed for local visual analysis)
analysis = VideoAnalysis.objects.create( analysis = VideoAnalysis.objects.create(
stream_key=stream_key, stream_key=stream_key,
session_id=session_id,
segment_path=segment_path, segment_path=segment_path,
provider=None, # Local analysis provider=None, # Local analysis
analysis_type='visual_analysis', analysis_type='visual_analysis',
@@ -195,14 +196,14 @@ def analyze_visual_properties(self, stream_key, segment_path):
@shared_task(bind=True) @shared_task(bind=True)
def process_video_segment(self, stream_key, segment_path): def process_video_segment(self, stream_key, segment_path, session_id=None):
"""Main task that dispatches to specialized analysis tasks""" """Main task that dispatches to specialized analysis tasks"""
try: try:
# Dispatch to specialized queues based on available capabilities # Dispatch to specialized queues based on available capabilities
active_capabilities = config_manager.get_active_capabilities() active_capabilities = config_manager.get_active_capabilities()
if 'logo_detection' in active_capabilities: if 'logo_detection' in active_capabilities:
analyze_logo_detection.delay(stream_key, segment_path) analyze_logo_detection.delay(stream_key, segment_path, session_id)
# Visual analysis disabled for performance - only logo detection # Visual analysis disabled for performance - only logo detection
# analyze_visual_properties.delay(stream_key, segment_path) # analyze_visual_properties.delay(stream_key, segment_path)

View File

@@ -5,7 +5,7 @@ from ai_processing.models import VideoAnalysis, AnalysisProvider, Brand
@require_http_methods(["GET"]) @require_http_methods(["GET"])
def stream_analysis(request, stream_id): def stream_analysis(request, stream_id):
analyses = VideoAnalysis.objects.filter(stream_id=stream_id).order_by('-timestamp') analyses = VideoAnalysis.objects.filter(stream_key=stream_id).order_by('-timestamp')
return JsonResponse({'results': [a.to_dict() for a in analyses]}) return JsonResponse({'results': [a.to_dict() for a in analyses]})

View File

@@ -2,6 +2,7 @@ import json
import logging import logging
from channels.generic.websocket import AsyncWebsocketConsumer from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async from channels.db import database_sync_to_async
from django.core.cache import cache
from ai_processing.models import VideoAnalysis from ai_processing.models import VideoAnalysis
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -13,6 +14,7 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
async def connect(self): async def connect(self):
# Initialize subscription set for dynamic stream groups # Initialize subscription set for dynamic stream groups
self.subscribed_streams = set() self.subscribed_streams = set()
self.stream_sessions = {} # Track session IDs per stream
await self.accept() await self.accept()
logger.info("WebSocket connected - ready to subscribe to streams") logger.info("WebSocket connected - ready to subscribe to streams")
@@ -27,6 +29,7 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
try: try:
data = json.loads(text_data) data = json.loads(text_data)
message_type = data.get('type') message_type = data.get('type')
logger.info(f"WebSocket received message: {data}")
if message_type == 'ping': if message_type == 'ping':
await self.send(text_data=json.dumps({ await self.send(text_data=json.dumps({
@@ -35,14 +38,21 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
})) }))
elif message_type == 'subscribe': elif message_type == 'subscribe':
stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key
session_id = data.get('session_id') # Get session ID from frontend
logger.info(f"Subscribe request: stream_key={stream_key}, session_id={session_id}")
if stream_key and stream_key not in self.subscribed_streams: if stream_key and stream_key not in self.subscribed_streams:
self.subscribed_streams.add(stream_key) self.subscribed_streams.add(stream_key)
self.stream_sessions[stream_key] = session_id # Track session for this stream
# Store session in cache for HTTP access (persistent)
cache.set(f"stream_session_{stream_key}", session_id, None) # No expiration
logger.info(f"Stored session in cache: stream_session_{stream_key} = {session_id}")
await self.channel_layer.group_add(f"stream_{stream_key}", self.channel_name) await self.channel_layer.group_add(f"stream_{stream_key}", self.channel_name)
await self.send_recent_analysis(stream_key) await self.send_recent_analysis(stream_key, session_id)
elif message_type == 'unsubscribe': elif message_type == 'unsubscribe':
stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key
if stream_key and stream_key in self.subscribed_streams: if stream_key and stream_key in self.subscribed_streams:
self.subscribed_streams.remove(stream_key) self.subscribed_streams.remove(stream_key)
self.stream_sessions.pop(stream_key, None) # Remove session tracking
await self.channel_layer.group_discard(f"stream_{stream_key}", self.channel_name) await self.channel_layer.group_discard(f"stream_{stream_key}", self.channel_name)
elif message_type == 'request_analysis': elif message_type == 'request_analysis':
# Trigger analysis if needed # Trigger analysis if needed
@@ -58,20 +68,21 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
})) }))
@database_sync_to_async @database_sync_to_async
def get_recent_analysis(self, stream_key): def get_recent_analysis(self, stream_key, session_id=None):
"""Get recent analysis results for a given stream""" """Get recent analysis results for a given stream and session"""
try: try:
analyses = VideoAnalysis.objects.filter( query = VideoAnalysis.objects.filter(stream_key=stream_key)
stream_key=stream_key if session_id:
).order_by('-timestamp')[:5] query = query.filter(session_id=session_id)
analyses = query.order_by('-timestamp')[:5]
return [analysis.to_dict() for analysis in analyses] return [analysis.to_dict() for analysis in analyses]
except Exception as e: except Exception as e:
logger.error(f"Error getting recent analysis for {stream_key}: {e}") logger.error(f"Error getting recent analysis for {stream_key}: {e}")
return [] return []
async def send_recent_analysis(self, stream_key): async def send_recent_analysis(self, stream_key, session_id=None):
"""Send recent analysis results to client for the given stream""" """Send recent analysis results to client for the given stream and session"""
recent_analyses = await self.get_recent_analysis(stream_key) recent_analyses = await self.get_recent_analysis(stream_key, session_id)
if recent_analyses: if recent_analyses:
await self.send(text_data=json.dumps({ await self.send(text_data=json.dumps({
'type': 'recent_analysis', 'type': 'recent_analysis',

View File

@@ -7,6 +7,7 @@ urlpatterns = [
path('streams/webcam/start/', views.start_webcam_stream, name='start_webcam_stream'), path('streams/webcam/start/', views.start_webcam_stream, name='start_webcam_stream'),
path('streams/<str:stream_key>/start/', views.start_stream, name='start_stream'), path('streams/<str:stream_key>/start/', views.start_stream, name='start_stream'),
path('streams/<str:stream_key>/stop/', views.stop_stream, name='stop_stream'), path('streams/<str:stream_key>/stop/', views.stop_stream, name='stop_stream'),
path('streams/<int:stream_id>/', views.delete_stream, name='delete_stream'),
path('streams/<str:stream_key>/analyze/', views.trigger_analysis, name='trigger_analysis'), path('streams/<str:stream_key>/analyze/', views.trigger_analysis, name='trigger_analysis'),
path('<str:filename>', views.serve_hls_file, name='serve_hls_file'), path('<str:filename>', views.serve_hls_file, name='serve_hls_file'),
] ]

View File

@@ -3,6 +3,7 @@ from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods from django.views.decorators.http import require_http_methods
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.conf import settings from django.conf import settings
from django.core.cache import cache
from .models import VideoStream, StreamStatus from .models import VideoStream, StreamStatus
from .source_adapters import SourceAdapterFactory from .source_adapters import SourceAdapterFactory
from ai_processing.processors.video_analyzer import VideoAnalyzer from ai_processing.processors.video_analyzer import VideoAnalyzer
@@ -140,11 +141,15 @@ def serve_hls_file(request, filename):
logger.info(f"Parsed stream_key: {stream_key} from filename: {filename}") logger.info(f"Parsed stream_key: {stream_key} from filename: {filename}")
if stream_key: if stream_key:
# Get session ID from cache
session_id = cache.get(f"stream_session_{stream_key}")
logger.info(f"Retrieved session_id: {session_id} for stream: {stream_key}")
# Queue for analysis # Queue for analysis
logger.info(f"Attempting to queue analysis for {filename}") logger.info(f"Attempting to queue analysis for {filename}")
analyzer = VideoAnalyzer() analyzer = VideoAnalyzer()
analyzer.queue_segment_analysis(stream_key, file_path) analyzer.queue_segment_analysis(stream_key, file_path, session_id)
logger.info(f"Queued segment for analysis: {filename} (stream: {stream_key})") logger.info(f"Queued segment for analysis: {filename} (stream: {stream_key}, session: {session_id})")
else: else:
logger.warning(f"No stream_key extracted from {filename}") logger.warning(f"No stream_key extracted from {filename}")
@@ -268,3 +273,28 @@ def start_webcam_stream(request):
except Exception as e: except Exception as e:
logger.error(f"Error starting webcam stream: {e}") logger.error(f"Error starting webcam stream: {e}")
return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@require_http_methods(["DELETE"])
def delete_stream(request, stream_id):
"""Delete a stream (only if inactive)"""
try:
stream = get_object_or_404(VideoStream, id=stream_id)
# Cannot delete active streams
if stream.status == StreamStatus.ACTIVE:
return JsonResponse({
'error': f'Cannot delete active stream: {stream.name}. Stop it first.'
}, status=400)
# Delete the stream
stream_name = stream.name
stream.delete()
logger.info(f"Deleted stream: {stream_name} (ID: {stream_id})")
return JsonResponse({'message': f'Stream "{stream_name}" deleted successfully'})
except Exception as e:
logger.error(f"Error deleting stream {stream_id}: {e}")
return JsonResponse({'error': str(e)}, status=500)

View File

@@ -0,0 +1,60 @@
worker_processes 1;
server {
listen 80;
listen [::]:80;
server_name localhost;
# Development: Proxy Angular dev server
location / {
proxy_pass http://host.docker.internal:4200;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
}
# API proxy (for backend communication)
location /api/ {
proxy_pass http://host.docker.internal:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
}
# WebSocket proxy (for real-time updates)
location /ws/ {
proxy_pass http://host.docker.internal:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# HLS streaming proxy: route to nginx-rtmp HLS server
location /streaming/ {
# HLS proxy: strip /streaming/ prefix and forward to nginx-rtmp
rewrite ^/streaming/(.*)$ /$1 break;
proxy_pass http://nginx-rtmp;
proxy_http_version 1.1;
proxy_set_header Host $host;
add_header Access-Control-Allow-Origin *;
add_header Cache-Control no-cache;
proxy_buffering off;
}
# Error pages
error_page 404 /index.html;
}

View File

@@ -1,3 +1,5 @@
worker_processes 1;
server { server {
listen 80; listen 80;
listen [::]:80; listen [::]:80;

View File

@@ -18,6 +18,7 @@
"@angular/router": "^17.3.0", "@angular/router": "^17.3.0",
"hls.js": "^1.6.10", "hls.js": "^1.6.10",
"rxjs": "~7.8.0", "rxjs": "~7.8.0",
"semver": "^7.7.2",
"tslib": "^2.3.0", "tslib": "^2.3.0",
"zone.js": "~0.14.3" "zone.js": "~0.14.3"
}, },
@@ -204,6 +205,19 @@
} }
} }
}, },
"node_modules/@angular-devkit/build-angular/node_modules/lru-cache": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
},
"engines": {
"node": ">=10"
}
},
"node_modules/@angular-devkit/build-angular/node_modules/rxjs": { "node_modules/@angular-devkit/build-angular/node_modules/rxjs": {
"version": "7.8.1", "version": "7.8.1",
"resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz",
@@ -214,6 +228,22 @@
"tslib": "^2.1.0" "tslib": "^2.1.0"
} }
}, },
"node_modules/@angular-devkit/build-angular/node_modules/semver": {
"version": "7.6.0",
"resolved": "https://registry.npmjs.org/semver/-/semver-7.6.0.tgz",
"integrity": "sha512-EnwXhrlwXMk9gKu5/flx5sv/an57AkRplG3hTK68W7FRDN+k+OWBj65M7719OkA82XLBxrcX0KSHj+X5COhOVg==",
"dev": true,
"license": "ISC",
"dependencies": {
"lru-cache": "^6.0.0"
},
"bin": {
"semver": "bin/semver.js"
},
"engines": {
"node": ">=10"
}
},
"node_modules/@angular-devkit/build-angular/node_modules/tslib": { "node_modules/@angular-devkit/build-angular/node_modules/tslib": {
"version": "2.6.2", "version": "2.6.2",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz",
@@ -221,6 +251,13 @@
"dev": true, "dev": true,
"license": "0BSD" "license": "0BSD"
}, },
"node_modules/@angular-devkit/build-angular/node_modules/yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==",
"dev": true,
"license": "ISC"
},
"node_modules/@angular-devkit/build-webpack": { "node_modules/@angular-devkit/build-webpack": {
"version": "0.1703.17", "version": "0.1703.17",
"resolved": "https://registry.npmjs.org/@angular-devkit/build-webpack/-/build-webpack-0.1703.17.tgz", "resolved": "https://registry.npmjs.org/@angular-devkit/build-webpack/-/build-webpack-0.1703.17.tgz",
@@ -368,6 +405,42 @@
"yarn": ">= 1.13.0" "yarn": ">= 1.13.0"
} }
}, },
"node_modules/@angular/cli/node_modules/lru-cache": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
},
"engines": {
"node": ">=10"
}
},
"node_modules/@angular/cli/node_modules/semver": {
"version": "7.6.0",
"resolved": "https://registry.npmjs.org/semver/-/semver-7.6.0.tgz",
"integrity": "sha512-EnwXhrlwXMk9gKu5/flx5sv/an57AkRplG3hTK68W7FRDN+k+OWBj65M7719OkA82XLBxrcX0KSHj+X5COhOVg==",
"dev": true,
"license": "ISC",
"dependencies": {
"lru-cache": "^6.0.0"
},
"bin": {
"semver": "bin/semver.js"
},
"engines": {
"node": ">=10"
}
},
"node_modules/@angular/cli/node_modules/yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==",
"dev": true,
"license": "ISC"
},
"node_modules/@angular/common": { "node_modules/@angular/common": {
"version": "17.3.12", "version": "17.3.12",
"resolved": "https://registry.npmjs.org/@angular/common/-/common-17.3.12.tgz", "resolved": "https://registry.npmjs.org/@angular/common/-/common-17.3.12.tgz",
@@ -10794,14 +10867,10 @@
} }
}, },
"node_modules/semver": { "node_modules/semver": {
"version": "7.6.0", "version": "7.7.2",
"resolved": "https://registry.npmjs.org/semver/-/semver-7.6.0.tgz", "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.2.tgz",
"integrity": "sha512-EnwXhrlwXMk9gKu5/flx5sv/an57AkRplG3hTK68W7FRDN+k+OWBj65M7719OkA82XLBxrcX0KSHj+X5COhOVg==", "integrity": "sha512-RF0Fw+rO5AMf9MAyaRXI4AV0Ulj5lMHqVxxdSgiVbixSCXoEmmX/jk0CuJw4+3SqroYO9VoUh+HcuJivvtJemA==",
"dev": true,
"license": "ISC", "license": "ISC",
"dependencies": {
"lru-cache": "^6.0.0"
},
"bin": { "bin": {
"semver": "bin/semver.js" "semver": "bin/semver.js"
}, },
@@ -10809,26 +10878,6 @@
"node": ">=10" "node": ">=10"
} }
}, },
"node_modules/semver/node_modules/lru-cache": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
},
"engines": {
"node": ">=10"
}
},
"node_modules/semver/node_modules/yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==",
"dev": true,
"license": "ISC"
},
"node_modules/send": { "node_modules/send": {
"version": "0.19.0", "version": "0.19.0",
"resolved": "https://registry.npmjs.org/send/-/send-0.19.0.tgz", "resolved": "https://registry.npmjs.org/send/-/send-0.19.0.tgz",

View File

@@ -3,7 +3,7 @@
"version": "0.0.0", "version": "0.0.0",
"scripts": { "scripts": {
"ng": "ng", "ng": "ng",
"start": "ng serve", "start": "ng serve --proxy-config proxy.conf.json",
"build": "ng build", "build": "ng build",
"watch": "ng build --watch --configuration development", "watch": "ng build --watch --configuration development",
"test": "ng test" "test": "ng test"
@@ -20,6 +20,7 @@
"@angular/router": "^17.3.0", "@angular/router": "^17.3.0",
"hls.js": "^1.6.10", "hls.js": "^1.6.10",
"rxjs": "~7.8.0", "rxjs": "~7.8.0",
"semver": "^7.7.2",
"tslib": "^2.3.0", "tslib": "^2.3.0",
"zone.js": "~0.14.3" "zone.js": "~0.14.3"
}, },

View File

@@ -0,0 +1,23 @@
{
"/api/**": {
"target": "http://localhost:8000",
"secure": false,
"logLevel": "debug",
"changeOrigin": true
},
"/streaming/**": {
"target": "http://localhost:8081",
"secure": false,
"logLevel": "debug",
"changeOrigin": true,
"pathRewrite": {
"^/streaming": ""
}
},
"/ws/**": {
"target": "http://localhost:8000",
"secure": false,
"ws": true,
"changeOrigin": true
}
}

View File

@@ -5,10 +5,7 @@
<main class="main-content"> <main class="main-content">
<div class="controls-section"> <div class="controls-section">
<app-stream-control <app-unified-stream-control></app-unified-stream-control>
(streamSelected)="onStreamSelected($event)"
(streamStopped)="onStreamStopped()">
</app-stream-control>
</div> </div>
<div class="viewer-section"> <div class="viewer-section">

View File

@@ -14,7 +14,7 @@
.main-content { .main-content {
display: grid; display: grid;
grid-template-columns: 350px 1fr; grid-template-columns: 450px 1fr;
grid-template-rows: auto auto; grid-template-rows: auto auto;
grid-template-areas: grid-template-areas:
"controls viewer" "controls viewer"
@@ -39,6 +39,14 @@
background: #f8f9fa; background: #f8f9fa;
padding: 1.5rem; padding: 1.5rem;
border-radius: 8px; border-radius: 8px;
> * {
margin-bottom: 1.5rem;
&:last-child {
margin-bottom: 0;
}
}
} }
.viewer-section { .viewer-section {

View File

@@ -1,17 +1,18 @@
import { Component, OnInit, OnDestroy, ViewChild } from '@angular/core'; import { Component, OnInit, OnDestroy, ViewChild } from '@angular/core';
import { RouterOutlet } from '@angular/router'; import { RouterOutlet } from '@angular/router';
import { HttpClientModule } from '@angular/common/http'; import { HttpClientModule } from '@angular/common/http';
import { StreamControlComponent } from './components/stream-control/stream-control.component'; import { UnifiedStreamControlComponent } from './components/unified-stream-control/unified-stream-control.component';
import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component'; import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component';
import { AnalysisPanelComponent } from './components/analysis-panel/analysis-panel.component'; import { AnalysisPanelComponent } from './components/analysis-panel/analysis-panel.component';
import { AnalysisService } from './services/analysis.service'; import { AnalysisService } from './services/analysis.service';
import { StreamService } from './services/stream.service'; import { StreamStateService } from './services/stream-state.service';
import { DetectionResult, VisualAnalysis, Analysis } from './models/analysis'; import { DetectionResult, VisualAnalysis, Analysis } from './models/analysis';
import { Subject, takeUntil } from 'rxjs';
@Component({ @Component({
selector: 'app-root', selector: 'app-root',
standalone: true, standalone: true,
imports: [RouterOutlet, HttpClientModule, StreamControlComponent, StreamViewerComponent, AnalysisPanelComponent], imports: [RouterOutlet, HttpClientModule, UnifiedStreamControlComponent, StreamViewerComponent, AnalysisPanelComponent],
templateUrl: './app.component.html', templateUrl: './app.component.html',
styleUrl: './app.component.scss' styleUrl: './app.component.scss'
}) })
@@ -20,95 +21,53 @@ export class AppComponent implements OnInit, OnDestroy {
title = 'Media Analyzer'; title = 'Media Analyzer';
selectedStreamUrl: string = ''; selectedStreamUrl: string = '';
currentStreamId: string = '';
currentDetections: DetectionResult[] = []; currentDetections: DetectionResult[] = [];
currentVisual?: VisualAnalysis; currentVisual?: VisualAnalysis;
recentAnalyses: Analysis[] = []; recentAnalyses: Analysis[] = [];
private destroy$ = new Subject<void>();
constructor( constructor(
private analysisService: AnalysisService, private analysisService: AnalysisService,
private streamService: StreamService private streamStateService: StreamStateService
) {} ) {}
ngOnInit() { ngOnInit() {
// Subscribe to stream URL changes from centralized state
this.streamStateService.currentStreamUrl$
.pipe(takeUntil(this.destroy$))
.subscribe(streamUrl => {
this.selectedStreamUrl = streamUrl;
// Clear stream viewer when URL is empty (stream stopped)
if (!streamUrl && this.streamViewer) {
this.streamViewer.clearStream();
}
});
// Subscribe to analysis updates // Subscribe to analysis updates
this.analysisService.detections$.subscribe(detections => { this.analysisService.detections$
this.currentDetections = detections; .pipe(takeUntil(this.destroy$))
}); .subscribe(detections => {
this.currentDetections = detections;
});
this.analysisService.visual$.subscribe(visual => { this.analysisService.visual$
this.currentVisual = visual || undefined; .pipe(takeUntil(this.destroy$))
}); .subscribe(visual => {
this.currentVisual = visual || undefined;
});
this.analysisService.analyses$.subscribe(analyses => { this.analysisService.analyses$
this.recentAnalyses = analyses; .pipe(takeUntil(this.destroy$))
}); .subscribe(analyses => {
this.recentAnalyses = analyses;
});
} }
ngOnDestroy() { ngOnDestroy() {
this.analysisService.disconnect(); this.destroy$.next();
} this.destroy$.complete();
onStreamSelected(streamUrl: string) {
console.log('App received stream URL:', streamUrl);
// Extract filename from backend URL, then construct a browser-resolvable URL
const filename = streamUrl.split('/').pop() || '';
this.selectedStreamUrl = `/streaming/${filename}`;
console.log('Using HLS URL:', this.selectedStreamUrl);
// Retry function to get active stream (with small delays to allow DB update)
const getActiveStreamWithRetry = (attempt = 1, maxAttempts = 3) => {
this.streamService.getStreams().subscribe({
next: (response) => {
const activeStream = response.streams.find(stream => stream.status === 'active');
if (activeStream) {
this.currentStreamId = activeStream.stream_key;
console.log('Found active stream with key:', this.currentStreamId);
// Connect to WebSocket for this stream
this.analysisService.connectToStream(this.currentStreamId);
} else if (attempt < maxAttempts) {
console.log(`No active stream found (attempt ${attempt}/${maxAttempts}), retrying in 1s...`);
setTimeout(() => getActiveStreamWithRetry(attempt + 1, maxAttempts), 1000);
} else {
console.log('No active stream found after retries, falling back to filename parsing');
this.fallbackToFilenameExtraction(filename);
}
},
error: (error) => {
console.error('Failed to get streams from API:', error);
this.fallbackToFilenameExtraction(filename);
}
});
};
// Start the retry process
getActiveStreamWithRetry();
}
private fallbackToFilenameExtraction(filename: string) {
const streamIdMatch = filename.match(/^([a-zA-Z0-9-]+)\.m3u8$/);
if (streamIdMatch) {
this.currentStreamId = streamIdMatch[1];
console.log('Fallback: Extracted stream ID from filename:', this.currentStreamId);
this.analysisService.connectToStream(this.currentStreamId);
} else {
console.error('Could not extract stream ID from filename:', filename);
}
}
onStreamStopped() {
console.log('Stream stopped - clearing player');
// Clear the stream from player
if (this.streamViewer) {
this.streamViewer.clearStream();
}
// Clear app state
this.selectedStreamUrl = '';
this.currentStreamId = '';
this.currentDetections = [];
this.currentVisual = undefined;
// Disconnect from WebSocket
this.analysisService.disconnect(); this.analysisService.disconnect();
} }
} }

View File

@@ -0,0 +1,45 @@
<div class="analysis-manager">
<h2>🔍 Analysis Manager</h2>
<p class="description">Configure AI analysis features for video streams</p>
<div class="manager-content">
<div class="analysis-features">
<div class="feature-option" *ngFor="let feature of analysisFeatures"
[class.available]="feature.available"
[class.unavailable]="!feature.available">
<label class="checkbox-container">
<input
type="checkbox"
[checked]="feature.enabled"
[disabled]="!feature.available"
(change)="onFeatureToggle(feature)">
<span class="checkbox-checkmark"></span>
<div class="feature-details">
<span class="feature-label">{{ feature.label }}</span>
<span class="feature-description">{{ feature.description }}</span>
</div>
<div class="feature-status" *ngIf="!feature.available">
<span class="coming-soon">Coming Soon</span>
</div>
</label>
</div>
</div>
<div class="status-section">
<div class="status-item">
<span class="status-label">Active Features:</span>
<span class="status-value">{{ activeFeatures }} / {{ availableFeatures }}</span>
</div>
<div class="status-item">
<span class="status-label">Processing Load:</span>
<span class="status-value">{{ activeFeatures > 0 ? 'Light' : 'None' }}</span>
</div>
<div class="status-item">
<span class="status-label">Estimated FPS:</span>
<span class="status-value">{{ activeFeatures > 0 ? '30 FPS' : 'N/A' }}</span>
</div>
</div>
</div>
</div>

View File

@@ -0,0 +1,121 @@
.analysis-manager {
padding: 20px;
}
.analysis-manager h2 {
margin-bottom: 8px;
color: #333;
}
.description {
color: #666;
margin-bottom: 20px;
font-style: italic;
}
.manager-content {
border: 1px solid #ddd;
border-radius: 8px;
padding: 20px;
background: #fafafa;
}
.analysis-features {
margin-bottom: 25px;
}
.feature-option {
margin-bottom: 15px;
}
.feature-option.available .checkbox-container {
cursor: pointer;
border-color: #28a745;
background: white;
}
.feature-option.unavailable .checkbox-container {
cursor: not-allowed;
border-color: #ddd;
background: #f8f8f8;
opacity: 0.6;
}
.checkbox-container {
display: flex;
align-items: center;
justify-content: space-between;
padding: 12px;
border: 1px solid #ddd;
border-radius: 6px;
background: white;
transition: all 0.2s;
}
.checkbox-container:hover.available {
border-color: #007bff;
background: #f8f9ff;
}
.checkbox-container input[type="checkbox"] {
margin-right: 12px;
}
.checkbox-container input[type="checkbox"]:disabled {
opacity: 0.5;
}
.checkbox-checkmark {
margin-right: 12px;
}
.feature-details {
display: flex;
flex-direction: column;
flex-grow: 1;
}
.feature-label {
font-weight: 600;
color: #333;
margin-bottom: 2px;
}
.feature-description {
font-size: 0.9em;
color: #666;
}
.feature-status {
margin-left: 12px;
}
.coming-soon {
background: #ffc107;
color: #333;
padding: 2px 8px;
border-radius: 10px;
font-size: 0.75em;
font-weight: 500;
}
.status-section {
border-top: 1px solid #ddd;
padding-top: 20px;
}
.status-item {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
}
.status-label {
color: #666;
font-weight: 500;
}
.status-value {
color: #333;
font-weight: 600;
}

View File

@@ -0,0 +1,22 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { AnalysisManagerComponent } from './analysis-manager.component';
describe('AnalysisManagerComponent', () => {
let component: AnalysisManagerComponent;
let fixture: ComponentFixture<AnalysisManagerComponent>;
beforeEach(async () => {
await TestBed.configureTestingModule({
imports: [AnalysisManagerComponent]
})
.compileComponents();
fixture = TestBed.createComponent(AnalysisManagerComponent);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@@ -0,0 +1,135 @@
import { Component } from '@angular/core';
import { CommonModule } from '@angular/common';
import { FormsModule } from '@angular/forms';
@Component({
selector: 'app-analysis-manager',
standalone: true,
imports: [CommonModule, FormsModule],
template: `
<div class="source-management disabled">
<div class="section-header" (click)="toggleSection()">
<h4>Analysis Manager</h4>
<span class="toggle-icon">{{ showContent ? '' : '+' }}</span>
</div>
<div class="management-content" *ngIf="showContent">
<div class="analysis-features">
<div class="feature-option" *ngFor="let feature of analysisFeatures">
<label class="checkbox-container">
<input
type="checkbox"
[checked]="feature.enabled"
disabled>
<span>{{ feature.label }}</span>
</label>
</div>
</div>
</div>
</div>
`,
styles: [`
.source-management {
border: 1px solid #dee2e6;
border-radius: 6px;
overflow: hidden;
}
.source-management.disabled {
opacity: 0.6;
}
.section-header {
background: #e9ecef;
padding: 12px 15px;
cursor: pointer;
display: flex;
justify-content: space-between;
align-items: center;
user-select: none;
}
.disabled .section-header {
background: #e0e0e0;
}
.section-header:hover {
background: #dee2e6;
}
.disabled .section-header:hover {
background: #e0e0e0;
}
.section-header h4 {
margin: 0;
color: #495057;
font-size: 14px;
}
.disabled .section-header h4 {
color: #6c757d;
}
.toggle-icon {
font-weight: bold;
color: #6c757d;
}
.management-content {
padding: 15px;
background: white;
}
.disabled .management-content {
background: #f8f8f8;
}
.analysis-features {
margin-bottom: 15px;
}
.feature-option {
margin-bottom: 8px;
}
.checkbox-container {
display: flex;
align-items: center;
gap: 8px;
padding: 6px 0;
}
.checkbox-container input[type="checkbox"] {
opacity: 0.4;
}
.checkbox-container span {
opacity: 0.6;
color: #6c757d;
}
`]
})
export class AnalysisManagerComponent {
showContent = false;
analysisFeatures = [
{ id: 'logo_detection', label: 'Logo Detection', enabled: true, available: true },
{ id: 'visual_properties', label: 'Visual Properties', enabled: false, available: false },
{ id: 'object_detection', label: 'Object Detection', enabled: false, available: false },
{ id: 'audio_transcript', label: 'Audio Transcript', enabled: false, available: false },
{ id: 'text_recognition', label: 'Text Recognition', enabled: false, available: false }
];
toggleSection() {
this.showContent = !this.showContent;
}
get activeFeatures() {
return this.analysisFeatures.filter(f => f.enabled).length;
}
get availableFeatures() {
return this.analysisFeatures.filter(f => f.available).length;
}
}

View File

@@ -1,4 +1,4 @@
import { Component, Input } from '@angular/core'; import { Component, Input, OnChanges } from '@angular/core';
import { CommonModule } from '@angular/common'; import { CommonModule } from '@angular/common';
import { Analysis, DetectionResult, VisualAnalysis } from '../../models/analysis'; import { Analysis, DetectionResult, VisualAnalysis } from '../../models/analysis';
@@ -9,11 +9,14 @@ import { Analysis, DetectionResult, VisualAnalysis } from '../../models/analysis
templateUrl: './analysis-panel.component.html', templateUrl: './analysis-panel.component.html',
styleUrl: './analysis-panel.component.scss' styleUrl: './analysis-panel.component.scss'
}) })
export class AnalysisPanelComponent { export class AnalysisPanelComponent implements OnChanges {
@Input() analyses: Analysis[] = []; @Input() analyses: Analysis[] = [];
@Input() currentDetections: DetectionResult[] = []; @Input() currentDetections: DetectionResult[] = [];
@Input() currentVisual?: VisualAnalysis; @Input() currentVisual?: VisualAnalysis;
ngOnChanges() {
}
getDetectionsByType(type: string): DetectionResult[] { getDetectionsByType(type: string): DetectionResult[] {
return this.currentDetections.filter(d => d.detection_type === type); return this.currentDetections.filter(d => d.detection_type === type);
} }

View File

@@ -0,0 +1,41 @@
<div class="execution-manager">
<h2>⚡ Execution Manager</h2>
<p class="description">Choose where AI processing will be executed</p>
<div class="manager-content disabled">
<div class="execution-modes">
<div class="mode-option" *ngFor="let mode of executionModes">
<label class="radio-container">
<input
type="radio"
name="executionMode"
[value]="mode.id"
[(ngModel)]="selectedExecution"
(change)="onExecutionModeChange(mode.id)"
disabled>
<span class="radio-checkmark"></span>
<div class="mode-details">
<span class="mode-label">{{ mode.label }}</span>
<span class="mode-description">{{ mode.description }}</span>
</div>
</label>
</div>
</div>
<div class="status-section">
<div class="status-item">
<span class="status-label">Current Mode:</span>
<span class="status-value">{{ selectedExecution | titlecase }}</span>
</div>
<div class="status-item">
<span class="status-label">Available Workers:</span>
<span class="status-value">1 Local</span>
</div>
<div class="feature-badge">
Coming Soon
</div>
</div>
</div>
</div>

View File

@@ -0,0 +1,113 @@
.execution-manager {
padding: 20px;
}
.execution-manager h2 {
margin-bottom: 8px;
color: #333;
}
.description {
color: #666;
margin-bottom: 20px;
font-style: italic;
}
.manager-content {
border: 1px solid #ddd;
border-radius: 8px;
padding: 20px;
background: #fafafa;
}
.manager-content.disabled {
opacity: 0.6;
background: #f8f8f8;
border-color: #ccc;
}
.execution-modes {
margin-bottom: 25px;
}
.mode-option {
margin-bottom: 15px;
}
.radio-container {
display: flex;
align-items: center;
cursor: not-allowed;
padding: 12px;
border: 1px solid #ddd;
border-radius: 6px;
background: white;
transition: all 0.2s;
}
.radio-container:hover {
border-color: #bbb;
}
.radio-container input[type="radio"] {
margin-right: 12px;
cursor: not-allowed;
}
.radio-container input[type="radio"]:disabled {
opacity: 0.5;
}
.radio-checkmark {
margin-right: 12px;
}
.mode-details {
display: flex;
flex-direction: column;
}
.mode-label {
font-weight: 600;
color: #333;
margin-bottom: 2px;
}
.mode-description {
font-size: 0.9em;
color: #666;
}
.status-section {
border-top: 1px solid #ddd;
padding-top: 20px;
position: relative;
}
.status-item {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
}
.status-label {
color: #666;
font-weight: 500;
}
.status-value {
color: #333;
font-weight: 600;
}
.feature-badge {
position: absolute;
top: -10px;
right: 10px;
background: #17a2b8;
color: white;
padding: 4px 12px;
border-radius: 12px;
font-size: 0.8em;
font-weight: 500;
}

View File

@@ -0,0 +1,22 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { ExecutionManagerComponent } from './execution-manager.component';
describe('ExecutionManagerComponent', () => {
let component: ExecutionManagerComponent;
let fixture: ComponentFixture<ExecutionManagerComponent>;
beforeEach(async () => {
await TestBed.configureTestingModule({
imports: [ExecutionManagerComponent]
})
.compileComponents();
fixture = TestBed.createComponent(ExecutionManagerComponent);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@@ -0,0 +1,128 @@
import { Component } from '@angular/core';
import { CommonModule } from '@angular/common';
import { FormsModule } from '@angular/forms';
@Component({
selector: 'app-execution-manager',
standalone: true,
imports: [CommonModule, FormsModule],
template: `
<div class="source-management disabled">
<div class="section-header" (click)="toggleSection()">
<h4>Execution Manager</h4>
<span class="toggle-icon">{{ showContent ? '' : '+' }}</span>
</div>
<div class="management-content" *ngIf="showContent">
<div class="execution-modes">
<div class="mode-option" *ngFor="let mode of executionModes">
<label class="radio-container">
<input
type="radio"
name="executionMode"
[value]="mode.id"
[(ngModel)]="selectedExecution"
disabled>
<span>{{ mode.label }}</span>
</label>
</div>
</div>
</div>
</div>
`,
styles: [`
.source-management {
border: 1px solid #dee2e6;
border-radius: 6px;
overflow: hidden;
}
.source-management.disabled {
opacity: 0.6;
}
.section-header {
background: #e9ecef;
padding: 12px 15px;
cursor: pointer;
display: flex;
justify-content: space-between;
align-items: center;
user-select: none;
}
.disabled .section-header {
background: #e0e0e0;
}
.section-header:hover {
background: #dee2e6;
}
.disabled .section-header:hover {
background: #e0e0e0;
}
.section-header h4 {
margin: 0;
color: #495057;
font-size: 14px;
}
.disabled .section-header h4 {
color: #6c757d;
}
.toggle-icon {
font-weight: bold;
color: #6c757d;
}
.management-content {
padding: 15px;
background: white;
}
.disabled .management-content {
background: #f8f8f8;
}
.execution-modes {
margin-bottom: 15px;
}
.mode-option {
margin-bottom: 8px;
}
.radio-container {
display: flex;
align-items: center;
gap: 8px;
padding: 6px 0;
}
.radio-container input[type="radio"] {
opacity: 0.4;
}
.radio-container span {
opacity: 0.6;
color: #6c757d;
}
`]
})
export class ExecutionManagerComponent {
selectedExecution: string = 'local';
showContent = false;
executionModes = [
{ id: 'local', label: 'Local' },
{ id: 'lan', label: 'LAN' },
{ id: 'cloud', label: 'Cloud' }
];
toggleSection() {
this.showContent = !this.showContent;
}
}

View File

@@ -7,8 +7,11 @@
</div> </div>
<div class="controls"> <div class="controls">
<button (click)="play()" class="btn">Play</button> <button (click)="togglePlayPause()" class="btn play-pause">
<button (click)="pause()" class="btn">Pause</button> {{ isPlaying ? 'Pause' : 'Play' }}
<button (click)="toggleOverlay()" class="btn">{{showOverlay ? 'Hide' : 'Show'}} Detections</button> </button>
<button (click)="toggleOverlay()" class="btn">
{{ showOverlay ? 'Hide Detections' : 'Show Detections' }}
</button>
</div> </div>
</div> </div>

View File

@@ -16,6 +16,7 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
@Input() detections: DetectionResult[] = []; @Input() detections: DetectionResult[] = [];
showOverlay = true; showOverlay = true;
isPlaying = false;
private hls?: Hls; private hls?: Hls;
private ctx?: CanvasRenderingContext2D; private ctx?: CanvasRenderingContext2D;
@@ -24,6 +25,12 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
if (this.streamUrl) { if (this.streamUrl) {
this.loadStream(this.streamUrl); this.loadStream(this.streamUrl);
} }
// Set up video event listeners
const video = this.videoElement.nativeElement;
video.addEventListener('play', () => this.isPlaying = true);
video.addEventListener('pause', () => this.isPlaying = false);
video.addEventListener('ended', () => this.isPlaying = false);
} }
ngOnChanges(changes: SimpleChanges) { ngOnChanges(changes: SimpleChanges) {
@@ -64,6 +71,8 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
this.hls.on(Hls.Events.MANIFEST_LOADED, () => { this.hls.on(Hls.Events.MANIFEST_LOADED, () => {
console.log('HLS manifest loaded'); console.log('HLS manifest loaded');
// Autoplay when manifest is loaded
this.autoPlay();
}); });
this.hls.on(Hls.Events.ERROR, (event, data) => { this.hls.on(Hls.Events.ERROR, (event, data) => {
@@ -75,11 +84,32 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
} else if (video.canPlayType('application/vnd.apple.mpegurl')) { } else if (video.canPlayType('application/vnd.apple.mpegurl')) {
// Native HLS support (Safari) // Native HLS support (Safari)
video.src = url; video.src = url;
video.addEventListener('loadedmetadata', () => this.autoPlay());
} else { } else {
console.error('HLS not supported'); console.error('HLS not supported');
} }
} }
private async autoPlay() {
try {
const video = this.videoElement.nativeElement;
video.muted = true; // Required for autoplay in most browsers
await video.play();
console.log('Video autoplay started');
} catch (error) {
console.log('Autoplay failed, user interaction required:', error);
}
}
async togglePlayPause() {
const video = this.videoElement.nativeElement;
if (video.paused) {
await this.play();
} else {
this.pause();
}
}
async play() { async play() {
try { try {
const video = this.videoElement.nativeElement; const video = this.videoElement.nativeElement;

View File

@@ -0,0 +1,682 @@
import { Component, OnInit, OnDestroy } from '@angular/core';
import { CommonModule } from '@angular/common';
import { FormsModule } from '@angular/forms';
import { Subject, takeUntil } from 'rxjs';
import { StreamStateService, StreamState, StreamSession } from '../../services/stream-state.service';
import { Stream } from '../../models/stream';
import { ExecutionManagerComponent } from '../execution-manager/execution-manager.component';
import { AnalysisManagerComponent } from '../analysis-manager/analysis-manager.component';
@Component({
selector: 'app-unified-stream-control',
standalone: true,
imports: [CommonModule, FormsModule, ExecutionManagerComponent, AnalysisManagerComponent],
template: `
<div class="stream-control-panel">
<div class="panel-header">
<h3>Control Panel</h3>
<div class="status-indicator" [class]="getStatusClass()">
{{ getStatusText() }}
</div>
</div>
<!-- Error Display -->
<div class="error-message" *ngIf="streamState.error">
{{ streamState.error }}
<button (click)="clearError()" class="clear-error">✕</button>
</div>
<!-- Main Controls - Always Visible -->
<div class="main-controls">
<!-- Source Selection -->
<div class="source-selection">
<label class="source-option">
<input
type="radio"
name="sourceType"
value="webcam"
[(ngModel)]="selectedSourceType"
[disabled]="streamState.isLoading || isStreaming">
<span>Webcam</span>
</label>
<label class="source-option">
<input
type="radio"
name="sourceType"
value="rtmp"
[(ngModel)]="selectedSourceType"
[disabled]="streamState.isLoading || isStreaming">
<span>RTMP Stream</span>
</label>
</div>
<!-- Auto-select first available RTMP stream when RTMP is selected -->
<div class="rtmp-info" *ngIf="selectedSourceType === 'rtmp' && rtmpStreams.length > 0">
<span class="stream-info">Stream: {{ rtmpStreams[0].stream_key }}</span>
</div>
<div class="rtmp-info" *ngIf="selectedSourceType === 'rtmp' && rtmpStreams.length === 0">
<span class="no-streams">No RTMP streams available</span>
</div>
<!-- Start/Stop Button -->
<div class="action-buttons">
<button
*ngIf="!isStreaming"
class="start-button"
(click)="startSelectedSource()"
[disabled]="streamState.isLoading || !canStart()">
{{ streamState.isLoading ? 'Starting...' : 'Start Stream' }}
</button>
<button
*ngIf="isStreaming"
class="stop-button"
(click)="stopStream()"
[disabled]="streamState.isLoading">
{{ streamState.isLoading ? 'Stopping...' : 'Stop Stream' }}
</button>
</div>
</div>
<!-- Current Session Info -->
<div class="current-session" *ngIf="streamState.currentSession">
<h4>Active Stream</h4>
<div class="session-info">
<div class="session-detail">
<span class="label">Type:</span>
<span class="value">{{ streamState.currentSession.sourceType.toUpperCase() }}</span>
</div>
<div class="session-detail">
<span class="label">Key:</span>
<span class="value">{{ streamState.currentSession.streamKey }}</span>
</div>
<div class="session-detail">
<span class="label">Started:</span>
<span class="value">{{ formatTime(streamState.currentSession.startedAt) }}</span>
</div>
<div class="session-detail">
<span class="label">Session ID:</span>
<span class="value">{{ streamState.currentSession.id }}</span>
</div>
</div>
</div>
<!-- Stream Manager (collapsible section) -->
<div class="source-management">
<div class="section-header" (click)="toggleSourceManagement()">
<h4>Stream Manager</h4>
<span class="toggle-icon">{{ showSourceManagement ? '' : '+' }}</span>
</div>
<div class="management-content" *ngIf="showSourceManagement">
<!-- Available Sources -->
<div class="available-sources" *ngIf="allStreams.length > 0">
<h5>Available Sources</h5>
<div class="source-list">
<div
class="source-item"
*ngFor="let stream of allStreams"
[class.active]="stream.status === 'active'">
<div class="source-header">
<div class="source-type-badge" [class]="stream.source_type">
{{ stream.source_type.toUpperCase() }}
</div>
<div class="source-actions">
<button
class="delete-button"
(click)="deleteStream(stream)"
[disabled]="streamState.isLoading || stream.status === 'active'"
title="Delete source">
🗑️
</button>
</div>
</div>
<div class="source-info">
<div class="info-row">
<span class="label">Stream Key:</span>
<span class="value mono">{{ stream.stream_key }}</span>
</div>
<div class="info-row">
<span class="label">Status:</span>
<span class="status-badge" [class]="stream.status">{{ stream.status }}</span>
</div>
<div class="info-row" *ngIf="stream.hls_playlist_url">
<span class="label">HLS URL:</span>
<span class="value mono small">{{ stream.hls_playlist_url }}</span>
</div>
</div>
</div>
</div>
</div>
<div class="no-sources" *ngIf="allStreams.length === 0">
<p>No sources available.</p>
</div>
</div>
</div>
<!-- Execution Manager -->
<app-execution-manager></app-execution-manager>
<!-- Analysis Manager -->
<app-analysis-manager></app-analysis-manager>
<!-- Loading Overlay -->
<div class="loading-overlay" *ngIf="streamState.isLoading">
<div class="spinner"></div>
</div>
</div>
`,
styles: [`
.stream-control-panel {
background: #f8f9fa;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 20px;
position: relative;
min-height: 200px;
}
.panel-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
}
.panel-header h3 {
margin: 0;
color: #343a40;
}
.status-indicator {
padding: 4px 12px;
border-radius: 12px;
font-size: 12px;
font-weight: 600;
text-transform: uppercase;
}
.status-indicator.streaming {
background: #d4edda;
color: #155724;
}
.status-indicator.idle {
background: #f8d7da;
color: #721c24;
}
.status-indicator.loading {
background: #fff3cd;
color: #856404;
}
.error-message {
background: #f8d7da;
color: #721c24;
padding: 10px;
border-radius: 4px;
margin-bottom: 15px;
display: flex;
justify-content: space-between;
align-items: center;
}
.clear-error {
background: none;
border: none;
color: #721c24;
cursor: pointer;
font-size: 16px;
}
/* Main Controls Section */
.main-controls {
background: white;
border: 1px solid #dee2e6;
border-radius: 6px;
padding: 20px;
margin-bottom: 20px;
}
.source-selection {
display: flex;
gap: 20px;
margin-bottom: 15px;
}
.source-option {
display: flex;
align-items: center;
gap: 8px;
cursor: pointer;
font-weight: 500;
}
.source-option input[type="radio"] {
margin: 0;
}
.rtmp-info {
margin-bottom: 15px;
padding: 8px 12px;
border-radius: 4px;
font-size: 14px;
}
.rtmp-info .stream-info {
color: #155724;
background: #d4edda;
padding: 4px 8px;
border-radius: 3px;
font-family: monospace;
}
.rtmp-info .no-streams {
color: #721c24;
font-style: italic;
}
.action-buttons {
display: flex;
justify-content: center;
}
.start-button, .stop-button {
padding: 12px 32px;
border: none;
border-radius: 6px;
font-weight: 600;
font-size: 16px;
cursor: pointer;
transition: all 0.2s;
min-width: 140px;
}
.start-button {
background: #28a745;
color: white;
}
.start-button:hover:not(:disabled) {
background: #1e7e34;
transform: translateY(-1px);
}
.stop-button {
background: #dc3545;
color: white;
}
.stop-button:hover:not(:disabled) {
background: #c82333;
transform: translateY(-1px);
}
button:disabled {
opacity: 0.6;
cursor: not-allowed;
transform: none !important;
}
/* Current Session Display */
.current-session {
background: #d4edda;
border: 1px solid #c3e6cb;
border-radius: 6px;
padding: 15px;
margin-bottom: 20px;
}
.current-session h4 {
margin: 0 0 10px 0;
color: #155724;
}
.session-info {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 10px;
}
.session-detail {
display: flex;
flex-direction: column;
}
.session-detail .label {
font-weight: 600;
font-size: 12px;
color: #155724;
text-transform: uppercase;
margin-bottom: 2px;
}
.session-detail .value {
color: #155724;
font-family: monospace;
font-size: 14px;
}
/* Source Management Section */
.source-management {
border: 1px solid #dee2e6;
border-radius: 6px;
overflow: hidden;
}
.section-header {
background: #e9ecef;
padding: 12px 15px;
cursor: pointer;
display: flex;
justify-content: space-between;
align-items: center;
user-select: none;
}
.section-header:hover {
background: #dee2e6;
}
.section-header h4 {
margin: 0;
color: #495057;
font-size: 14px;
}
.toggle-icon {
font-weight: bold;
color: #6c757d;
}
.management-content {
padding: 15px;
background: white;
}
/* Source List Styles */
.source-list {
margin-top: 15px;
}
.source-item {
border: 1px solid #dee2e6;
border-radius: 6px;
margin-bottom: 15px;
overflow: hidden;
}
.source-item.active {
border-color: #28a745;
background: #f8fff9;
}
.source-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 10px 15px;
background: #f8f9fa;
border-bottom: 1px solid #dee2e6;
}
.source-type-badge {
padding: 4px 12px;
border-radius: 12px;
font-size: 11px;
font-weight: 600;
text-transform: uppercase;
}
.source-type-badge.webcam {
background: #007bff;
color: white;
}
.source-type-badge.rtmp {
background: #6f42c1;
color: white;
}
.delete-button {
background: #dc3545;
color: white;
border: none;
border-radius: 4px;
padding: 6px 10px;
cursor: pointer;
font-size: 16px;
transition: background-color 0.2s;
}
.delete-button:hover:not(:disabled) {
background: #c82333;
}
.delete-button:disabled {
background: #6c757d;
cursor: not-allowed;
}
.source-info {
padding: 15px;
}
.info-row {
display: flex;
margin-bottom: 8px;
align-items: flex-start;
}
.info-row:last-child {
margin-bottom: 0;
}
.info-row .label {
font-weight: 600;
color: #495057;
width: 100px;
flex-shrink: 0;
font-size: 12px;
text-transform: uppercase;
}
.info-row .value {
color: #212529;
flex: 1;
word-break: break-all;
}
.info-row .value.mono {
font-family: 'Courier New', monospace;
background: #f8f9fa;
padding: 2px 6px;
border-radius: 3px;
font-size: 11px;
}
.info-row .value.small {
font-size: 11px;
}
.status-badge {
padding: 3px 8px;
border-radius: 12px;
font-size: 10px;
font-weight: 600;
text-transform: uppercase;
}
.status-badge.active {
background: #d4edda;
color: #155724;
}
.status-badge.inactive {
background: #f8d7da;
color: #721c24;
}
.no-sources {
text-align: center;
padding: 40px 20px;
color: #6c757d;
font-style: italic;
}
.no-sources p {
margin: 0;
}
.available-sources h5 {
margin: 0 0 15px 0;
color: #495057;
font-size: 14px;
font-weight: 600;
}
/* Loading Overlay */
.loading-overlay {
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: rgba(255, 255, 255, 0.9);
display: flex;
align-items: center;
justify-content: center;
border-radius: 8px;
z-index: 10;
}
.spinner {
width: 40px;
height: 40px;
border: 4px solid #f3f3f3;
border-top: 4px solid #007bff;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
`]
})
export class UnifiedStreamControlComponent implements OnInit, OnDestroy {
streamState: StreamState = {
isLoading: false,
currentSession: null,
availableStreams: [],
error: null
};
selectedSourceType: 'webcam' | 'rtmp' = 'webcam';
showSourceManagement = false;
private destroy$ = new Subject<void>();
constructor(private streamStateService: StreamStateService) {}
ngOnInit() {
this.streamStateService.state$
.pipe(takeUntil(this.destroy$))
.subscribe(state => {
this.streamState = state;
// Update source selection based on active stream
if (state.currentSession) {
this.selectedSourceType = state.currentSession.sourceType as 'webcam' | 'rtmp';
}
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
async startSelectedSource() {
if (this.selectedSourceType === 'webcam') {
await this.streamStateService.startWebcamStream();
} else if (this.selectedSourceType === 'rtmp' && this.rtmpStreams.length > 0) {
await this.streamStateService.startRtmpStream(this.rtmpStreams[0].stream_key);
}
}
async stopStream() {
await this.streamStateService.stopCurrentStream();
}
toggleSourceManagement() {
this.showSourceManagement = !this.showSourceManagement;
}
async deleteStream(stream: Stream) {
if (stream.status === 'active') {
alert('Cannot delete an active stream. Stop it first.');
return;
}
if (!confirm(`Are you sure you want to delete "${stream.name}"?`)) {
return;
}
try {
// Call the backend API to delete the stream
await this.streamStateService.deleteStream(stream.id);
} catch (error) {
console.error('Failed to delete stream:', error);
alert('Failed to delete stream. Please try again.');
}
}
canStart(): boolean {
if (this.selectedSourceType === 'webcam') {
return true;
}
return this.selectedSourceType === 'rtmp' && this.rtmpStreams.length > 0;
}
clearError() {
// Update state to clear error
const currentState = this.streamState;
this.streamState = { ...currentState, error: null };
}
get rtmpStreams(): Stream[] {
return this.streamState.availableStreams.filter(stream => stream.source_type === 'rtmp');
}
get allStreams(): Stream[] {
return this.streamState.availableStreams;
}
get isStreaming(): boolean {
return !!this.streamState.currentSession;
}
getStatusText(): string {
if (this.streamState.isLoading) return 'Loading';
if (this.streamState.currentSession) return 'Streaming';
return 'Idle';
}
getStatusClass(): string {
if (this.streamState.isLoading) return 'loading';
if (this.streamState.currentSession) return 'streaming';
return 'idle';
}
formatTime(date: Date): string {
return new Date(date).toLocaleTimeString();
}
}

View File

@@ -26,6 +26,7 @@ export interface VisualAnalysis {
export interface Analysis { export interface Analysis {
id: string; id: string;
stream_id: string; stream_id: string;
session_id?: string;
timestamp: string; timestamp: string;
processing_time?: number; processing_time?: number;
analysis_type: string; analysis_type: string;

View File

@@ -10,7 +10,8 @@ export class AnalysisService {
private currentDetections = new BehaviorSubject<DetectionResult[]>([]); private currentDetections = new BehaviorSubject<DetectionResult[]>([]);
private currentVisual = new BehaviorSubject<VisualAnalysis | null>(null); private currentVisual = new BehaviorSubject<VisualAnalysis | null>(null);
private recentAnalyses = new BehaviorSubject<Analysis[]>([]); private recentAnalyses = new BehaviorSubject<Analysis[]>([]);
private streamStartTime: Date | null = null; private currentSessionId: string | null = null;
private connectedStreamKey: string | null = null;
public detections$ = this.currentDetections.asObservable(); public detections$ = this.currentDetections.asObservable();
public visual$ = this.currentVisual.asObservable(); public visual$ = this.currentVisual.asObservable();
@@ -23,35 +24,43 @@ export class AnalysisService {
}); });
} }
connectToStream(streamId: string) { connectToStream(streamKey: string, sessionId?: string) {
this.streamStartTime = new Date(); // Set current session for filtering
this.websocketService.subscribe(streamId); this.currentSessionId = sessionId || `session_${Date.now()}`;
this.connectedStreamKey = streamKey;
// Clear existing analysis data when starting new session
this.clearAnalysis();
// Connect to WebSocket with session ID
this.websocketService.subscribe(streamKey, this.currentSessionId);
console.log('Connected to stream analysis:', {
streamKey,
sessionId: this.currentSessionId
});
} }
disconnect() { disconnect() {
this.websocketService.unsubscribe(); this.websocketService.unsubscribe();
this.websocketService.disconnect(); this.websocketService.disconnect();
this.currentDetections.next([]); this.clearAnalysis();
this.currentVisual.next(null); this.currentSessionId = null;
this.streamStartTime = null; this.connectedStreamKey = null;
} }
private handleAnalysisUpdate(analysis: Analysis) { private handleAnalysisUpdate(analysis: Analysis) {
console.log('Received analysis update:', analysis); // Only process analysis if we have an active session
if (!this.currentSessionId) {
// Filter out analysis from before stream started (with 30 second buffer for recent analysis) return;
if (this.streamStartTime && analysis.timestamp) {
const analysisTime = new Date(analysis.timestamp);
const bufferTime = new Date(this.streamStartTime.getTime() - 30000); // 30 seconds before stream start
if (analysisTime < bufferTime) {
console.log('Ignoring old analysis from before stream start:', {
analysisTime: analysisTime.toISOString(),
streamStart: this.streamStartTime.toISOString()
});
return;
}
} }
// Filter by session ID - only process analysis for current session
if (analysis.session_id && analysis.session_id !== this.currentSessionId) {
return;
}
// Update recent analyses list // Update recent analyses list
const current = this.recentAnalyses.value; const current = this.recentAnalyses.value;
const updated = [analysis, ...current.slice(0, 9)]; // Keep last 10 const updated = [analysis, ...current.slice(0, 9)]; // Keep last 10
@@ -66,11 +75,6 @@ export class AnalysisService {
this.currentVisual.next(analysis.visual); this.currentVisual.next(analysis.visual);
} }
console.log('Analysis update:', {
detections: detections.length,
visual: !!analysis.visual,
timestamp: analysis.timestamp
});
} }
getCurrentDetections(): DetectionResult[] { getCurrentDetections(): DetectionResult[] {
@@ -90,4 +94,12 @@ export class AnalysisService {
this.currentVisual.next(null); this.currentVisual.next(null);
this.recentAnalyses.next([]); this.recentAnalyses.next([]);
} }
getCurrentSessionId(): string | null {
return this.currentSessionId;
}
isConnectedToStream(streamKey: string): boolean {
return this.connectedStreamKey === streamKey && !!this.currentSessionId;
}
} }

View File

@@ -0,0 +1,301 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, combineLatest, distinctUntilChanged, map } from 'rxjs';
import { StreamService } from './stream.service';
import { AnalysisService } from './analysis.service';
import { Stream } from '../models/stream';
export interface StreamSession {
id: string;
streamKey: string;
hlsUrl: string;
sourceType: 'webcam' | 'rtmp';
startedAt: Date;
}
export interface StreamState {
isLoading: boolean;
currentSession: StreamSession | null;
availableStreams: Stream[];
error: string | null;
}
@Injectable({
providedIn: 'root'
})
export class StreamStateService {
private readonly SESSION_KEY = 'media_analyzer_session';
private state = new BehaviorSubject<StreamState>({
isLoading: false,
currentSession: null,
availableStreams: [],
error: null
});
public state$ = this.state.asObservable();
// Derived observables for common use cases
public isStreaming$ = this.state$.pipe(
map(state => !!state.currentSession),
distinctUntilChanged()
);
public currentStreamUrl$ = this.state$.pipe(
map(state => state.currentSession?.hlsUrl || ''),
distinctUntilChanged()
);
public isLoading$ = this.state$.pipe(
map(state => state.isLoading),
distinctUntilChanged()
);
constructor(
private streamService: StreamService,
private analysisService: AnalysisService
) {
this.loadAvailableStreams();
this.restoreSession();
// Auto-connect to active streams after loading
this.autoConnectToActiveStream();
}
// Stream Operations
async startWebcamStream(): Promise<void> {
this.updateState({ isLoading: true, error: null });
try {
// Stop any existing stream first
await this.stopCurrentStream();
const stream = await this.streamService.startWebcamStream().toPromise();
if (!stream) throw new Error('Failed to start webcam stream');
const session = this.createSession(stream.stream_key, stream.hls_playlist_url || '', 'webcam');
await this.activateSession(session);
} catch (error: any) {
this.handleError(error);
} finally {
this.updateState({ isLoading: false });
}
}
async startRtmpStream(streamKey: string): Promise<void> {
this.updateState({ isLoading: true, error: null });
try {
// Stop any existing stream first
await this.stopCurrentStream();
const response = await this.streamService.startStream(streamKey).toPromise();
if (!response) throw new Error('Failed to start RTMP stream');
const session = this.createSession(streamKey, response.hls_playlist_url, 'rtmp');
await this.activateSession(session);
} catch (error: any) {
this.handleError(error);
} finally {
this.updateState({ isLoading: false });
}
}
async stopCurrentStream(): Promise<void> {
const currentSession = this.state.value.currentSession;
if (!currentSession) return;
this.updateState({ isLoading: true, error: null });
try {
// Stop backend stream
await this.streamService.stopStream(currentSession.streamKey).toPromise();
// Disconnect analysis service
this.analysisService.disconnect();
// Clear session
this.clearSession();
} catch (error: any) {
this.handleError(error);
} finally {
this.updateState({ isLoading: false });
this.loadAvailableStreams(); // Refresh stream list
}
}
async createRtmpStream(name: string): Promise<void> {
this.updateState({ isLoading: true, error: null });
try {
const stream = await this.streamService.createStream({
name,
source_type: 'rtmp',
processing_mode: 'live'
}).toPromise();
if (stream) {
await this.loadAvailableStreams();
}
} catch (error: any) {
this.handleError(error);
} finally {
this.updateState({ isLoading: false });
}
}
async deleteStream(streamId: number): Promise<void> {
this.updateState({ isLoading: true, error: null });
try {
await this.streamService.deleteStream(streamId).toPromise();
await this.loadAvailableStreams();
} catch (error: any) {
this.handleError(error);
} finally {
this.updateState({ isLoading: false });
}
}
// Session Management
private createSession(streamKey: string, hlsUrl: string, sourceType: 'webcam' | 'rtmp'): StreamSession {
const session: StreamSession = {
id: this.generateSessionId(),
streamKey,
hlsUrl: this.normalizeHlsUrl(hlsUrl),
sourceType,
startedAt: new Date()
};
this.persistSession(session);
return session;
}
private async activateSession(session: StreamSession): Promise<void> {
// Update state first
this.updateState({ currentSession: session });
// Connect to analysis WebSocket with session ID
this.analysisService.connectToStream(session.streamKey, session.id);
// Wait a moment for the stream to be ready
await new Promise(resolve => setTimeout(resolve, 1000));
// Refresh available streams to show current status
await this.loadAvailableStreams();
}
private clearSession(): void {
localStorage.removeItem(this.SESSION_KEY);
this.updateState({ currentSession: null });
}
private persistSession(session: StreamSession): void {
localStorage.setItem(this.SESSION_KEY, JSON.stringify(session));
}
private restoreSession(): void {
try {
const stored = localStorage.getItem(this.SESSION_KEY);
if (stored) {
const session: StreamSession = JSON.parse(stored);
// Only restore if session is recent (within last hour)
const sessionAge = Date.now() - new Date(session.startedAt).getTime();
if (sessionAge < 3600000) { // 1 hour
this.updateState({ currentSession: session });
this.analysisService.connectToStream(session.streamKey, session.id);
} else {
this.clearSession();
}
}
} catch (error) {
console.warn('Failed to restore session:', error);
this.clearSession();
}
}
// Auto-connection Logic
private autoConnectToActiveStream(): void {
// Wait a moment for streams to load
setTimeout(async () => {
const currentSession = this.state.value.currentSession;
if (currentSession) {
// Already have a session, don't auto-connect
return;
}
// Look for active streams
const streams = this.state.value.availableStreams;
const activeStream = streams.find(s => s.status === 'active');
if (activeStream) {
console.log('Auto-connecting to active stream:', activeStream.stream_key);
// Create a session for the active stream
const session = this.createSession(
activeStream.stream_key,
activeStream.hls_playlist_url || '',
activeStream.source_type as 'webcam' | 'rtmp'
);
this.updateState({ currentSession: session });
// Connect analysis with the session ID
this.analysisService.connectToStream(session.streamKey, session.id);
}
}, 1000);
}
// Utility Methods
private async loadAvailableStreams(): Promise<void> {
try {
const response = await this.streamService.getStreams().toPromise();
if (response) {
this.updateState({ availableStreams: response.streams });
}
} catch (error) {
console.error('Failed to load streams:', error);
}
}
private normalizeHlsUrl(hlsUrl: string): string {
// Convert backend URL to direct nginx URL via proxy
const filename = hlsUrl.split('/').pop() || '';
return `/streaming/${filename}`;
}
private generateSessionId(): string {
return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private updateState(partial: Partial<StreamState>): void {
const current = this.state.value;
this.state.next({ ...current, ...partial });
}
private handleError(error: any): void {
let errorMessage = 'An unknown error occurred';
if (error.status === 409) {
errorMessage = error.error?.error || 'Stream conflict - another stream may be active';
} else if (error.error?.error) {
errorMessage = error.error.error;
} else if (error.message) {
errorMessage = error.message;
}
console.error('Stream operation error:', error);
this.updateState({ error: errorMessage });
}
// Getters for current state
getCurrentSession(): StreamSession | null {
return this.state.value.currentSession;
}
getAvailableStreams(): Stream[] {
return this.state.value.availableStreams;
}
isCurrentlyStreaming(): boolean {
return !!this.state.value.currentSession;
}
}

View File

@@ -31,4 +31,8 @@ export class StreamService {
stopStream(streamKey: string): Observable<{message: string}> { stopStream(streamKey: string): Observable<{message: string}> {
return this.http.post<{message: string}>(`${this.apiUrl}/streams/${streamKey}/stop/`, {}); return this.http.post<{message: string}>(`${this.apiUrl}/streams/${streamKey}/stop/`, {});
} }
deleteStream(streamId: number): Observable<{message: string}> {
return this.http.delete<{message: string}>(`${this.apiUrl}/streams/${streamId}/`);
}
} }

View File

@@ -35,7 +35,6 @@ export class WebsocketService {
this.socket.onmessage = (event) => { this.socket.onmessage = (event) => {
try { try {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
console.log('WebSocket message:', data);
if (data.type === 'analysis_update') { if (data.type === 'analysis_update') {
this.analysisSubject.next(data.analysis); this.analysisSubject.next(data.analysis);
@@ -43,6 +42,9 @@ export class WebsocketService {
data.analyses.forEach((analysis: Analysis) => { data.analyses.forEach((analysis: Analysis) => {
this.analysisSubject.next(analysis); this.analysisSubject.next(analysis);
}); });
} else if (data.type === 'pong') {
} else {
console.log('❓ Unknown message type:', data.type);
} }
} catch (error) { } catch (error) {
console.error('Error parsing WebSocket message:', error); console.error('Error parsing WebSocket message:', error);
@@ -72,6 +74,7 @@ export class WebsocketService {
send(message: any) { send(message: any) {
if (this.socket?.readyState === WebSocket.OPEN) { if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message)); this.socket.send(JSON.stringify(message));
} else {
} }
} }
@@ -82,10 +85,29 @@ export class WebsocketService {
}); });
} }
subscribe(streamId: string) { subscribe(streamId: string, sessionId?: string) {
this.currentStreamId = streamId; this.currentStreamId = streamId;
this.connect(); this.connect();
this.send({ type: 'subscribe', stream_id: streamId });
// Wait for connection to be open before subscribing
const message: any = { type: 'subscribe', stream_id: streamId };
if (sessionId) {
message.session_id = sessionId;
}
if (this.socket?.readyState === WebSocket.OPEN) {
this.send(message);
} else {
// Wait for WebSocket to open, then subscribe
const checkAndSend = () => {
if (this.socket?.readyState === WebSocket.OPEN) {
this.send(message);
} else {
setTimeout(checkAndSend, 100);
}
};
setTimeout(checkAndSend, 100);
}
} }
unsubscribe() { unsubscribe() {

View File

@@ -0,0 +1,17 @@
{
"name": "media-analyzer",
"version": "1.0.0",
"description": "Real-time video analysis platform",
"scripts": {
"dev:backend": "./start-backend-only.sh",
"dev:frontend": "./start-frontend-dev.sh",
"dev:full": "docker compose up -d",
"stop": "docker compose down",
"logs": "docker compose logs -f",
"build": "docker compose build",
"clean": "docker compose down -v --remove-orphans"
},
"keywords": ["video", "streaming", "ai", "analysis", "docker"],
"author": "Media Analyzer Team",
"license": "MIT"
}

View File

@@ -0,0 +1,37 @@
#!/bin/bash
echo "🚀 Starting backend services only (excluding frontend)..."
echo "Frontend will run locally with 'ng serve' for faster development"
echo ""
# Start all services except frontend
docker compose up -d \
postgres \
redis \
backend \
celery-logo \
celery-default \
file-watcher \
nginx-rtmp
echo ""
echo "✅ Backend services started!"
echo ""
echo "📋 Services running:"
echo " - PostgreSQL: localhost:5432"
echo " - Redis: localhost:6379"
echo " - Backend API: localhost:8000"
echo " - RTMP Server: localhost:1935 (RTMP)"
echo " - HLS Streaming: localhost:8081 (HTTP)"
echo ""
echo "🔧 To start frontend development:"
echo " cd frontend"
echo " ng serve --proxy-config proxy.conf.json"
echo ""
echo "🌐 Frontend will be available at: http://localhost:4200"
echo ""
echo "📊 To check service status:"
echo " docker compose ps"
echo ""
echo "📜 To view logs:"
echo " docker compose logs -f [service-name]"

View File

@@ -0,0 +1,37 @@
#!/bin/bash
echo "🖥️ Starting Angular development server..."
echo ""
# Check if we're in the right directory
if [ ! -f "frontend/package.json" ]; then
echo "❌ Error: Run this script from the media-analyzer root directory"
exit 1
fi
# Check if backend services are running
if ! docker compose ps | grep -q "backend.*Up"; then
echo "⚠️ Warning: Backend services don't appear to be running"
echo " Run './start-backend-only.sh' first to start backend services"
echo ""
fi
cd frontend
# Check if node_modules exists
if [ ! -d "node_modules" ]; then
echo "📦 Installing npm dependencies..."
npm install
echo ""
fi
echo "🔥 Starting Angular dev server with hot reload..."
echo " Frontend: http://localhost:4200"
echo " Backend API: http://localhost:8000 (proxied)"
echo " HLS Streaming: http://localhost:8081 (proxied)"
echo ""
echo "💡 Changes to TypeScript files will auto-reload!"
echo ""
# Start Angular dev server with proxy
ng serve --proxy-config proxy.conf.json --host 0.0.0.0 --port 4200