integrate webcam support with real-time detection

This commit is contained in:
buenosairesam
2025-08-25 04:01:24 -03:00
parent c7822c1ec4
commit ad2b1a8a8f
38 changed files with 1252 additions and 458 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ postgres_data
media/* media/*
!media/.gitkeep !media/.gitkeep
media-analyzer-src.tar.gz media-analyzer-src.tar.gz
media-analyzer/media

59
GEMINI.md Normal file
View File

@@ -0,0 +1,59 @@
# Media Analyzer
## Project Overview
This project is a real-time video streaming and AI analysis platform. It ingests RTMP video streams, processes them with computer vision AI models, and provides live analysis results through a responsive web dashboard.
The architecture is based on microservices and includes:
* **Backend**: A Django application that handles video stream management, AI processing, and WebSocket communication for real-time updates.
* **Frontend**: An Angular single-page application that provides a user interface for stream viewing and analysis visualization.
* **AI Processing**: A Python-based analysis engine that uses various adapters for different types of analysis, such as object detection, logo detection, and motion analysis. The engine can be configured to run locally, on a remote LAN worker, or in the cloud.
* **Streaming**: An NGINX server with the RTMP module ingests video streams and converts them to HLS for web playback.
* **Infrastructure**: The entire platform is containerized using Docker and can be deployed with Docker Compose for development or Kubernetes for production.
## Building and Running
### Docker Compose (Development)
To run the application in a development environment, use Docker Compose:
```bash
# Start all services
docker compose up
# Run migrations (in a separate terminal)
docker compose --profile tools up migrate
```
The application will be accessible at the following URLs:
* **Frontend**: `http://localhost:4200`
* **Backend API**: `http://localhost:8000`
* **RTMP Stream**: `rtmp://localhost:1935/live`
* **HLS Stream**: `http://localhost:8081/hls`
### Kubernetes (Production-like)
To deploy the application to a Kubernetes cluster, you can use the provided scripts:
```bash
# Build and push images to a local registry
./k8s/build-for-ctlptl.sh
# Deploy to Kubernetes
kubectl apply -k k8s/overlays/development
# Check deployment status
kubectl get pods -n media-analyzer
# Access the application via port forwarding
kubectl port-forward service/frontend -n media-analyzer 4200:80
```
## Development Conventions
* **Backend**: The backend is a Django application. Follow Django best practices for development.
* **Frontend**: The frontend is an Angular application. Follow Angular best practices for development.
* **AI Processing**: The AI processing engine is designed to be extensible. To add a new analysis capability, create a new adapter and integrate it with the `AnalysisEngine`.
* **Testing**: The project includes a `test_unified_ai.py` file, which suggests that there is a testing framework in place. Run existing tests and add new ones when making changes.

View File

@@ -0,0 +1,88 @@
You have a few options to speed up frontend development while keeping the backend services
running:
Option 1: Hybrid Development (Recommended)
Run frontend locally, backend in Docker:
# Terminal 1: Start backend services only
docker compose up postgres redis backend celery-logo celery-default file-watcher nginx-rtmp
# Terminal 2: Run Angular dev server locally
cd frontend && ng serve --host 0.0.0.0 --port 4200
Pros: Hot reload, fast rebuilds, best developer experience
Cons: Need to manage CORS and proxy config
Option 2: Docker Bind Mount + Manual Rebuild
Keep frontend container but rebuild manually:
# Build frontend assets locally
cd frontend && ng build --watch
# Or for production build
cd frontend && ng build --configuration production
# Assets go to dist/ and can be volume mounted to nginx
Option 3: Frontend Container with Volume Mount
Modify docker-compose.yml for dev:
frontend:
build:
context: ./frontend
target: development
volumes:
- ./frontend/src:/app/src # Mount source
- ./frontend/dist:/usr/share/nginx/html # Mount build output
command: sh -c "ng build --watch & nginx -g 'daemon off;'"
Option 4: Live Reload in Container
Add this to your frontend Dockerfile development stage:
FROM node:18 AS development
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
# Start both ng build --watch and nginx
CMD ["sh", "-c", "ng build --watch --output-path /usr/share/nginx/html & nginx -g 'daemon off;'"]
My Recommendation: Option 1
For your use case, I'd run:
# Keep backend running
docker compose up -d postgres redis backend celery-logo celery-default file-watcher nginx-rtmp
# Run frontend locally with proxy
cd frontend && ng serve --proxy-config proxy.conf.json
Create frontend/proxy.conf.json:
{
"/api/*": {
"target": "http://localhost:8000",
"secure": false,
"logLevel": "debug"
},
"/streaming/*": {
"target": "http://localhost:8000",
"secure": false
},
"/ws/*": {
"target": "http://localhost:8000",
"secure": false,
"ws": true
}
}
This gives you:
- ✅ Instant hot reload
- ✅ Fast TypeScript compilation
- ✅ Backend services running
- ✅ No container rebuilds
Would you like me to set up the proxy config for the hybrid approach?

View File

@@ -15,6 +15,8 @@ logger = logging.getLogger(__name__)
class AnalysisEngine: class AnalysisEngine:
"""Main analysis engine that orchestrates capability-specific adapters with execution strategies""" """Main analysis engine that orchestrates capability-specific adapters with execution strategies"""
_strategy_logged = False
def __init__(self): def __init__(self):
self.object_detector = None self.object_detector = None
self.logo_detector = None self.logo_detector = None
@@ -66,7 +68,9 @@ class AnalysisEngine:
logger.warning(f"Unknown strategy type {strategy_type}, falling back to local") logger.warning(f"Unknown strategy type {strategy_type}, falling back to local")
self.execution_strategy = strategy_configs['local']() self.execution_strategy = strategy_configs['local']()
logger.info(f"Configured execution strategy: {strategy_type}") if not AnalysisEngine._strategy_logged:
logger.info(f"Configured execution strategy: {strategy_type}")
AnalysisEngine._strategy_logged = True
except Exception as e: except Exception as e:
logger.error(f"Failed to configure execution strategy: {e}") logger.error(f"Failed to configure execution strategy: {e}")
@@ -76,8 +80,17 @@ class AnalysisEngine:
def extract_frame_from_segment(self, segment_path, timestamp=None): def extract_frame_from_segment(self, segment_path, timestamp=None):
"""Extract frame from video segment""" """Extract frame from video segment"""
try: try:
import os
logger.debug(f"Attempting to extract frame from: {segment_path}")
if not os.path.exists(segment_path):
logger.error(f"Segment file does not exist: {segment_path}")
return None
cap = cv2.VideoCapture(segment_path) cap = cv2.VideoCapture(segment_path)
if not cap.isOpened(): if not cap.isOpened():
logger.error(f"OpenCV failed to open: {segment_path}")
return None return None
# For TS segments, seeking is problematic, just read first frame # For TS segments, seeking is problematic, just read first frame
@@ -88,6 +101,8 @@ class AnalysisEngine:
if ret: if ret:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(frame_rgb) return Image.fromarray(frame_rgb)
else:
logger.error(f"Failed to read frame from {segment_path}")
return None return None
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,23 @@
# Generated migration for stream_id to stream_key rename
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('ai_processing', '0002_make_provider_nullable'),
]
operations = [
migrations.RenameField(
model_name='videoanalysis',
old_name='stream_id',
new_name='stream_key',
),
migrations.RenameField(
model_name='processingqueue',
old_name='stream_id',
new_name='stream_key',
),
]

View File

@@ -0,0 +1,35 @@
# Generated by Django 5.0.6 on 2025-08-22 00:19
import ai_processing.models
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ai_processing', '0003_rename_stream_id_to_stream_key'),
]
operations = [
migrations.RemoveIndex(
model_name='processingqueue',
name='ai_processi_stream__60b035_idx',
),
migrations.RemoveIndex(
model_name='videoanalysis',
name='ai_processi_stream__b961da_idx',
),
migrations.AlterField(
model_name='videoanalysis',
name='confidence_threshold',
field=models.FloatField(default=ai_processing.models.get_default_confidence_threshold),
),
migrations.AddIndex(
model_name='processingqueue',
index=models.Index(fields=['stream_key'], name='ai_processi_stream__aecb17_idx'),
),
migrations.AddIndex(
model_name='videoanalysis',
index=models.Index(fields=['stream_key', 'timestamp'], name='ai_processi_stream__d99710_idx'),
),
]

View File

@@ -2,6 +2,15 @@ from django.db import models
import uuid import uuid
def get_default_confidence_threshold():
"""Get default confidence threshold from settings"""
try:
from django.conf import settings
return settings.LOGO_DETECTION_CONFIG['confidence_threshold']
except:
return 0.6 # Fallback if settings not available
class AnalysisProvider(models.Model): class AnalysisProvider(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True) name = models.CharField(max_length=100, unique=True)
@@ -28,20 +37,20 @@ class Brand(models.Model):
class VideoAnalysis(models.Model): class VideoAnalysis(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
stream_id = models.CharField(max_length=100) stream_key = models.CharField(max_length=100) # Use stream_key instead of stream_id
segment_path = models.CharField(max_length=500) segment_path = models.CharField(max_length=500)
timestamp = models.DateTimeField(auto_now_add=True) timestamp = models.DateTimeField(auto_now_add=True)
processing_time = models.FloatField(null=True) processing_time = models.FloatField(null=True)
provider = models.ForeignKey(AnalysisProvider, on_delete=models.CASCADE, null=True, blank=True) provider = models.ForeignKey(AnalysisProvider, on_delete=models.CASCADE, null=True, blank=True)
analysis_type = models.CharField(max_length=50) analysis_type = models.CharField(max_length=50)
confidence_threshold = models.FloatField(default=0.5) confidence_threshold = models.FloatField(default=get_default_confidence_threshold)
frame_timestamp = models.FloatField() frame_timestamp = models.FloatField()
external_request_id = models.CharField(max_length=200, null=True) external_request_id = models.CharField(max_length=200, null=True)
def to_dict(self): def to_dict(self):
return { return {
'id': str(self.id), 'id': str(self.id),
'stream_id': self.stream_id, 'stream_key': self.stream_key,
'timestamp': self.timestamp.isoformat(), 'timestamp': self.timestamp.isoformat(),
'processing_time': self.processing_time, 'processing_time': self.processing_time,
'analysis_type': self.analysis_type, 'analysis_type': self.analysis_type,
@@ -53,7 +62,7 @@ class VideoAnalysis(models.Model):
class Meta: class Meta:
indexes = [ indexes = [
models.Index(fields=['stream_id', 'timestamp']), models.Index(fields=['stream_key', 'timestamp']),
models.Index(fields=['analysis_type']), models.Index(fields=['analysis_type']),
] ]
@@ -128,7 +137,7 @@ class Brand(models.Model):
class ProcessingQueue(models.Model): class ProcessingQueue(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
stream_id = models.CharField(max_length=100) stream_key = models.CharField(max_length=100) # Use stream_key instead of stream_id
segment_path = models.CharField(max_length=500) segment_path = models.CharField(max_length=500)
priority = models.IntegerField(default=0) priority = models.IntegerField(default=0)
status = models.CharField(max_length=20, choices=[ status = models.CharField(max_length=20, choices=[
@@ -146,5 +155,5 @@ class ProcessingQueue(models.Model):
class Meta: class Meta:
indexes = [ indexes = [
models.Index(fields=['status', 'priority']), models.Index(fields=['status', 'priority']),
models.Index(fields=['stream_id']), models.Index(fields=['stream_key']),
] ]

View File

@@ -50,12 +50,12 @@ class VideoAnalyzer:
except Exception as e: except Exception as e:
logger.error(f"Error setting up providers: {e}") logger.error(f"Error setting up providers: {e}")
def queue_segment_analysis(self, stream_id, segment_path): def queue_segment_analysis(self, stream_key, segment_path):
"""Queue video segment for analysis""" """Queue video segment for analysis"""
try: try:
# Check if already queued # Check if already queued
existing = ProcessingQueue.objects.filter( existing = ProcessingQueue.objects.filter(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
status__in=['pending', 'processing'] status__in=['pending', 'processing']
).exists() ).exists()
@@ -66,14 +66,14 @@ class VideoAnalyzer:
# Create queue item # Create queue item
queue_item = ProcessingQueue.objects.create( queue_item = ProcessingQueue.objects.create(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
analysis_types=['logo_detection'], analysis_types=['logo_detection'],
priority=1 priority=1
) )
# Trigger async processing # Trigger async processing
process_video_segment.delay(stream_id, segment_path) process_video_segment.delay(stream_key, segment_path)
logger.info(f"Queued segment for analysis: {segment_path}") logger.info(f"Queued segment for analysis: {segment_path}")
return True return True

View File

@@ -11,13 +11,13 @@ channel_layer = get_channel_layer()
@shared_task(bind=True, queue='logo_detection') @shared_task(bind=True, queue='logo_detection')
def analyze_logo_detection(self, stream_id, segment_path): def analyze_logo_detection(self, stream_key, segment_path):
"""Dedicated task for logo detection analysis""" """Dedicated task for logo detection analysis"""
queue_item = None queue_item = None
try: try:
# Update queue status # Update queue status
queue_item = ProcessingQueue.objects.filter( queue_item = ProcessingQueue.objects.filter(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
status='pending' status='pending'
).first() ).first()
@@ -50,20 +50,22 @@ def analyze_logo_detection(self, stream_id, segment_path):
queue_item.save() queue_item.save()
return {"error": "Failed to extract frame"} return {"error": "Failed to extract frame"}
# Analyze for logos only # Analyze for logos only - use configured threshold
analysis_results = engine.analyze_frame(frame, ['logo_detection'], confidence_threshold=0.3) from django.conf import settings
confidence = settings.LOGO_DETECTION_CONFIG['confidence_threshold']
analysis_results = engine.analyze_frame(frame, ['logo_detection'], confidence_threshold=confidence)
# Store results # Store results
provider_info = config_manager.get_provider_by_type(logo_config['provider_type']) provider_info = config_manager.get_provider_by_type(logo_config['provider_type'])
provider = AnalysisProvider.objects.get(id=provider_info['id']) provider = AnalysisProvider.objects.get(id=provider_info['id'])
analysis = VideoAnalysis.objects.create( analysis = VideoAnalysis.objects.create(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
provider=provider, provider=provider,
analysis_type='logo_detection', analysis_type='logo_detection',
frame_timestamp=0.0, frame_timestamp=0.0,
confidence_threshold=0.3 confidence_threshold=confidence
) )
detections = [] detections = []
@@ -83,8 +85,10 @@ def analyze_logo_detection(self, stream_id, segment_path):
# Send results via WebSocket if detections found # Send results via WebSocket if detections found
if detections: if detections:
websocket_group = f"stream_{stream_key}"
logger.info(f"Sending websocket update to group: {websocket_group}")
async_to_sync(channel_layer.group_send)( async_to_sync(channel_layer.group_send)(
f"stream_{stream_id}", websocket_group,
{ {
"type": "analysis_update", "type": "analysis_update",
"analysis": analysis.to_dict() "analysis": analysis.to_dict()
@@ -96,11 +100,12 @@ def analyze_logo_detection(self, stream_id, segment_path):
queue_item.status = 'completed' queue_item.status = 'completed'
queue_item.save() queue_item.save()
if detections: result = {
logger.info(f"Logo detection: {len(detections)} detections found") "detections": len(detections),
else: "analysis_id": str(analysis.id),
logger.debug(f"Logo detection: no detections found") "brands": [d['label'] for d in detections] if detections else []
return {"detections": len(detections), "analysis_id": str(analysis.id)} }
return result
except Exception as e: except Exception as e:
logger.error(f"Logo detection failed for {segment_path}: {e}") logger.error(f"Logo detection failed for {segment_path}: {e}")
@@ -112,13 +117,13 @@ def analyze_logo_detection(self, stream_id, segment_path):
@shared_task(bind=True, queue='visual_analysis') @shared_task(bind=True, queue='visual_analysis')
def analyze_visual_properties(self, stream_id, segment_path): def analyze_visual_properties(self, stream_key, segment_path):
"""Dedicated task for visual property analysis""" """Dedicated task for visual property analysis"""
queue_item = None queue_item = None
try: try:
# Update queue status # Update queue status
queue_item = ProcessingQueue.objects.filter( queue_item = ProcessingQueue.objects.filter(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
status='pending' status='pending'
).first() ).first()
@@ -145,7 +150,7 @@ def analyze_visual_properties(self, stream_id, segment_path):
# Store results (no provider needed for local visual analysis) # Store results (no provider needed for local visual analysis)
analysis = VideoAnalysis.objects.create( analysis = VideoAnalysis.objects.create(
stream_id=stream_id, stream_key=stream_key,
segment_path=segment_path, segment_path=segment_path,
provider=None, # Local analysis provider=None, # Local analysis
analysis_type='visual_analysis', analysis_type='visual_analysis',
@@ -165,7 +170,7 @@ def analyze_visual_properties(self, stream_id, segment_path):
# Send results via WebSocket # Send results via WebSocket
async_to_sync(channel_layer.group_send)( async_to_sync(channel_layer.group_send)(
f"stream_{stream_id}", f"stream_{stream_key}",
{ {
"type": "analysis_update", "type": "analysis_update",
"analysis": analysis.to_dict() "analysis": analysis.to_dict()
@@ -190,17 +195,17 @@ def analyze_visual_properties(self, stream_id, segment_path):
@shared_task(bind=True) @shared_task(bind=True)
def process_video_segment(self, stream_id, segment_path): def process_video_segment(self, stream_key, segment_path):
"""Main task that dispatches to specialized analysis tasks""" """Main task that dispatches to specialized analysis tasks"""
try: try:
# Dispatch to specialized queues based on available capabilities # Dispatch to specialized queues based on available capabilities
active_capabilities = config_manager.get_active_capabilities() active_capabilities = config_manager.get_active_capabilities()
if 'logo_detection' in active_capabilities: if 'logo_detection' in active_capabilities:
analyze_logo_detection.delay(stream_id, segment_path) analyze_logo_detection.delay(stream_key, segment_path)
# Visual analysis disabled for performance - only logo detection # Visual analysis disabled for performance - only logo detection
# analyze_visual_properties.delay(stream_id, segment_path) # analyze_visual_properties.delay(stream_key, segment_path)
return {"dispatched": True, "capabilities": active_capabilities} return {"dispatched": True, "capabilities": active_capabilities}
@@ -222,7 +227,7 @@ def reload_analysis_config():
@shared_task @shared_task
def analyze_frame_task(stream_id, segment_path, frame_timestamp=0.0): def analyze_frame_task(stream_key, segment_path, frame_timestamp=0.0):
"""Analyze a single frame from video segment""" """Analyze a single frame from video segment"""
try: try:
engine = AnalysisEngine() engine = AnalysisEngine()
@@ -251,7 +256,7 @@ def analyze_frame_task(stream_id, segment_path, frame_timestamp=0.0):
results = engine.analyze_frame(frame, ['logo_detection', 'visual_analysis']) results = engine.analyze_frame(frame, ['logo_detection', 'visual_analysis'])
return { return {
"stream_id": stream_id, "stream_key": stream_key,
"results": results, "results": results,
"frame_timestamp": frame_timestamp "frame_timestamp": frame_timestamp
} }

View File

@@ -26,7 +26,7 @@ application = ProtocolTypeRouter({
"http": django_asgi_app, "http": django_asgi_app,
"websocket": AuthMiddlewareStack( "websocket": AuthMiddlewareStack(
URLRouter([ URLRouter([
path("ws/stream/<str:stream_id>/", StreamAnalysisConsumer.as_asgi()), path("ws/stream/", StreamAnalysisConsumer.as_asgi()),
]) ])
), ),
}) })

View File

@@ -244,7 +244,7 @@ else:
# Logo Detection Configuration # Logo Detection Configuration
LOGO_DETECTION_CONFIG = { LOGO_DETECTION_CONFIG = {
'confidence_threshold': float(os.getenv('LOGO_CONFIDENCE_THRESHOLD', '0.3')), 'confidence_threshold': float(os.getenv('LOGO_CONFIDENCE_THRESHOLD', '0.6')),
'enabled_brands': os.getenv('ENABLED_BRANDS', 'Apple,Google,Nike,Coca-Cola,McDonald,Amazon').split(','), 'enabled_brands': os.getenv('ENABLED_BRANDS', 'Apple,Google,Nike,Coca-Cola,McDonald,Amazon').split(','),
'use_cloud_vision': USE_CLOUD_VISION, 'use_cloud_vision': USE_CLOUD_VISION,
} }

View File

@@ -21,7 +21,8 @@ from django.conf.urls.static import static
urlpatterns = [ urlpatterns = [
path('admin/', admin.site.urls), path('admin/', admin.site.urls),
path('api/', include('streaming.urls')), path('api/streaming/', include('streaming.urls')),
path('api/', include('api.urls')),
path('streaming/', include('streaming.urls')), path('streaming/', include('streaming.urls')),
] ]

View File

@@ -11,28 +11,16 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
"""WebSocket consumer for real-time analysis updates""" """WebSocket consumer for real-time analysis updates"""
async def connect(self): async def connect(self):
self.stream_id = self.scope['url_route']['kwargs']['stream_id'] # Initialize subscription set for dynamic stream groups
self.room_group_name = f'stream_{self.stream_id}' self.subscribed_streams = set()
# Join stream group
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept() await self.accept()
logger.info(f"WebSocket connected for stream {self.stream_id}") logger.info("WebSocket connected - ready to subscribe to streams")
# Send recent analysis results
await self.send_recent_analysis()
async def disconnect(self, close_code): async def disconnect(self, close_code):
# Leave stream group # Leave all subscribed stream groups
await self.channel_layer.group_discard( for stream_key in getattr(self, 'subscribed_streams', []):
self.room_group_name, await self.channel_layer.group_discard(f"stream_{stream_key}", self.channel_name)
self.channel_name logger.info("WebSocket disconnected")
)
logger.info(f"WebSocket disconnected for stream {self.stream_id}")
async def receive(self, text_data): async def receive(self, text_data):
"""Handle incoming WebSocket messages""" """Handle incoming WebSocket messages"""
@@ -45,10 +33,20 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
'type': 'pong', 'type': 'pong',
'timestamp': data.get('timestamp') 'timestamp': data.get('timestamp')
})) }))
elif message_type == 'subscribe':
stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key
if stream_key and stream_key not in self.subscribed_streams:
self.subscribed_streams.add(stream_key)
await self.channel_layer.group_add(f"stream_{stream_key}", self.channel_name)
await self.send_recent_analysis(stream_key)
elif message_type == 'unsubscribe':
stream_key = data.get('stream_id') # Frontend still sends 'stream_id' but it's actually stream_key
if stream_key and stream_key in self.subscribed_streams:
self.subscribed_streams.remove(stream_key)
await self.channel_layer.group_discard(f"stream_{stream_key}", self.channel_name)
elif message_type == 'request_analysis': elif message_type == 'request_analysis':
# Trigger analysis if needed # Trigger analysis if needed
pass pass
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("Invalid JSON received") logger.error("Invalid JSON received")
@@ -60,21 +58,20 @@ class StreamAnalysisConsumer(AsyncWebsocketConsumer):
})) }))
@database_sync_to_async @database_sync_to_async
def get_recent_analysis(self): def get_recent_analysis(self, stream_key):
"""Get recent analysis results for stream""" """Get recent analysis results for a given stream"""
try: try:
analyses = VideoAnalysis.objects.filter( analyses = VideoAnalysis.objects.filter(
stream_id=self.stream_id stream_key=stream_key
).order_by('-timestamp')[:5] ).order_by('-timestamp')[:5]
return [analysis.to_dict() for analysis in analyses] return [analysis.to_dict() for analysis in analyses]
except Exception as e: except Exception as e:
logger.error(f"Error getting recent analysis: {e}") logger.error(f"Error getting recent analysis for {stream_key}: {e}")
return [] return []
async def send_recent_analysis(self): async def send_recent_analysis(self, stream_key):
"""Send recent analysis results to client""" """Send recent analysis results to client for the given stream"""
recent_analyses = await self.get_recent_analysis() recent_analyses = await self.get_recent_analysis(stream_key)
if recent_analyses: if recent_analyses:
await self.send(text_data=json.dumps({ await self.send(text_data=json.dumps({
'type': 'recent_analysis', 'type': 'recent_analysis',

View File

@@ -1,6 +1,7 @@
import ffmpeg import ffmpeg
import logging import logging
import subprocess import subprocess
import platform
from pathlib import Path from pathlib import Path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -32,5 +33,46 @@ class FFmpegHandler:
) )
return ffmpeg.run_async(output, pipe_stdout=True, pipe_stderr=True) return ffmpeg.run_async(output, pipe_stdout=True, pipe_stderr=True)
def webcam_to_hls(self, device_index: int, output_path: str, width: int = 640, height: int = 480, fps: int = 30) -> subprocess.Popen:
"""Convert webcam stream to HLS (cross-platform)"""
system = platform.system().lower()
if system == 'windows':
# Windows: DirectShow
stream = ffmpeg.input(
f'video="Integrated Camera"',
f='dshow',
video_size=f'{width}x{height}',
framerate=fps
)
elif system == 'darwin': # macOS
# macOS: AVFoundation
stream = ffmpeg.input(
f'{device_index}',
f='avfoundation',
video_size=f'{width}x{height}',
framerate=fps
)
else: # Linux and others
# Linux: Video4Linux2
stream = ffmpeg.input(
f'/dev/video{device_index}',
f='v4l2',
s=f'{width}x{height}',
framerate=fps
)
output = ffmpeg.output(
stream, output_path,
vcodec='libx264',
preset='ultrafast', # Fast encoding for real-time
tune='zerolatency', # Low latency
f='hls',
hls_time=2, # Short segments for responsiveness
hls_list_size=10,
hls_flags='delete_segments'
)
return ffmpeg.run_async(output, pipe_stdout=True, pipe_stderr=True)
# Singleton # Singleton
ffmpeg_handler = FFmpegHandler() ffmpeg_handler = FFmpegHandler()

View File

@@ -0,0 +1,84 @@
import os
import time
import logging
from pathlib import Path
from django.conf import settings
from ai_processing.processors.video_analyzer import VideoAnalyzer
from .models import VideoStream, StreamStatus
logger = logging.getLogger(__name__)
class HLSFileWatcher:
"""Watch for new HLS segment files and trigger analysis"""
def __init__(self, media_dir=None, poll_interval=1.0):
self.media_dir = Path(media_dir or settings.MEDIA_ROOT)
self.poll_interval = poll_interval
self.processed_files = set()
self.analyzer = VideoAnalyzer()
def get_stream_key_from_filename(self, filename):
"""Extract stream_key from filename: 'stream_key-segment_number.ts' -> 'stream_key'"""
if not filename.endswith('.ts'):
return None
base_name = filename.rsplit('.', 1)[0] # Remove .ts extension
stream_key = base_name.rsplit('-', 1)[0] # Remove last segment: "-123"
return stream_key if stream_key else None
def process_new_segment(self, file_path):
"""Process a new HLS segment file"""
try:
# Determine the active stream from the database
active_stream = VideoStream.objects.filter(status=StreamStatus.ACTIVE).first()
if not active_stream:
logger.warning(f"File watcher: No active stream found, skipping segment {file_path.name}")
return
stream_key = active_stream.stream_key
logger.info(f"File watcher: Processing new segment {file_path.name} (stream: {stream_key})")
# Queue for analysis
self.analyzer.queue_segment_analysis(stream_key, str(file_path))
logger.info(f"File watcher: Queued segment for analysis: {file_path.name}")
except Exception as e:
logger.error(f"File watcher: Error processing {file_path}: {e}")
import traceback
logger.error(f"File watcher: Traceback: {traceback.format_exc()}")
def scan_for_new_files(self):
"""Scan for new .ts files in the media directory"""
try:
if not self.media_dir.exists():
return
current_files = set()
for ts_file in self.media_dir.glob("*.ts"):
if ts_file.is_file():
current_files.add(ts_file)
# Find new files
new_files = current_files - self.processed_files
for new_file in new_files:
self.process_new_segment(new_file)
self.processed_files.add(new_file)
except Exception as e:
logger.error(f"File watcher: Error scanning directory: {e}")
def start_watching(self):
"""Start the file watching loop"""
logger.info(f"File watcher: Starting to watch {self.media_dir}")
# Initial scan to catch existing files
self.scan_for_new_files()
while True:
try:
self.scan_for_new_files()
time.sleep(self.poll_interval)
except KeyboardInterrupt:
logger.info("File watcher: Stopped by user")
break
except Exception as e:
logger.error(f"File watcher: Unexpected error: {e}")
time.sleep(self.poll_interval)

View File

@@ -0,0 +1,30 @@
from django.core.management.base import BaseCommand
from streaming.file_watcher import HLSFileWatcher
class Command(BaseCommand):
help = 'Watch for new HLS segment files and trigger analysis'
def add_arguments(self, parser):
parser.add_argument(
'--poll-interval',
type=float,
default=1.0,
help='Polling interval in seconds (default: 1.0)'
)
def handle(self, *args, **options):
poll_interval = options['poll_interval']
self.stdout.write(
self.style.SUCCESS(f'Starting HLS file watcher (poll interval: {poll_interval}s)')
)
watcher = HLSFileWatcher(poll_interval=poll_interval)
try:
watcher.start_watching()
except KeyboardInterrupt:
self.stdout.write(
self.style.SUCCESS('HLS file watcher stopped')
)

View File

@@ -4,6 +4,9 @@ from pathlib import Path
from django.conf import settings from django.conf import settings
from .models import VideoStream, StreamStatus from .models import VideoStream, StreamStatus
from .ffmpeg_handler import ffmpeg_handler from .ffmpeg_handler import ffmpeg_handler
import threading
import os
import signal
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -43,6 +46,13 @@ class RTMPSourceAdapter(VideoSourceAdapter):
try: try:
self.update_stream_status(StreamStatus.STARTING) self.update_stream_status(StreamStatus.STARTING)
# Check if any other stream is active (only one stream allowed)
active_streams = VideoStream.objects.filter(status=StreamStatus.ACTIVE).exclude(id=self.stream.id)
if active_streams.exists():
logger.warning(f"Cannot start RTMP - another stream is active: {active_streams.first().name}")
self.update_stream_status(StreamStatus.ERROR)
return False
# Files go directly in media directory # Files go directly in media directory
media_dir = Path(settings.MEDIA_ROOT) media_dir = Path(settings.MEDIA_ROOT)
@@ -53,9 +63,14 @@ class RTMPSourceAdapter(VideoSourceAdapter):
# Start FFmpeg conversion # Start FFmpeg conversion
self.process = ffmpeg_handler.rtmp_to_hls(rtmp_url, playlist_path) self.process = ffmpeg_handler.rtmp_to_hls(rtmp_url, playlist_path)
# Persist FFmpeg PID for stop operations
try:
pid_file = media_dir / f'{self.stream.stream_key}.pid'
with pid_file.open('w') as f:
f.write(str(self.process.pid))
except Exception as e:
logger.error(f"RTMPSourceAdapter: Failed to write PID file: {e}")
# HLS URL is now generated dynamically from settings # HLS URL is now generated dynamically from settings
self.update_stream_status(StreamStatus.ACTIVE) self.update_stream_status(StreamStatus.ACTIVE)
logger.info(f"Started RTMP processing for stream {self.stream.id}") logger.info(f"Started RTMP processing for stream {self.stream.id}")
return True return True
@@ -68,15 +83,31 @@ class RTMPSourceAdapter(VideoSourceAdapter):
def stop_processing(self) -> bool: def stop_processing(self) -> bool:
try: try:
self.update_stream_status(StreamStatus.STOPPING) self.update_stream_status(StreamStatus.STOPPING)
media_dir = Path(settings.MEDIA_ROOT)
pid_file = media_dir / f'{self.stream.stream_key}.pid'
# Attempt to terminate in-memory process
if self.process and self.process.poll() is None: if self.process and self.process.poll() is None:
self.process.terminate() self.process.terminate()
self.process.wait(timeout=10) try:
self.process.wait(timeout=10)
except Exception:
pass
# Fallback: terminate by PID file
elif pid_file.exists():
try:
pid = int(pid_file.read_text())
os.kill(pid, signal.SIGTERM)
except Exception as kill_err:
logger.error(f"RTMPSourceAdapter: Failed to kill PID {pid}: {kill_err}")
# Cleanup PID file
if pid_file.exists():
try:
pid_file.unlink()
except Exception as unlink_err:
logger.error(f"RTMPSourceAdapter: Failed to remove PID file: {unlink_err}")
self.update_stream_status(StreamStatus.INACTIVE) self.update_stream_status(StreamStatus.INACTIVE)
logger.info(f"Stopped RTMP processing for stream {self.stream.id}") logger.info(f"Stopped RTMP processing for stream {self.stream.id}")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Failed to stop RTMP processing: {e}") logger.error(f"Failed to stop RTMP processing: {e}")
self.update_stream_status(StreamStatus.ERROR) self.update_stream_status(StreamStatus.ERROR)
@@ -125,6 +156,119 @@ class FileSourceAdapter(VideoSourceAdapter):
return str(media_dir / f'{self.stream.stream_key}.m3u8') return str(media_dir / f'{self.stream.stream_key}.m3u8')
class WebcamSourceAdapter(VideoSourceAdapter):
"""Adapter for webcam streams"""
def start_processing(self) -> bool:
try:
logger.info(f"Starting webcam processing for stream {self.stream.id} with key {self.stream.stream_key}")
self.update_stream_status(StreamStatus.STARTING)
# Check if any other stream is active (only one stream allowed)
active_streams = VideoStream.objects.filter(status=StreamStatus.ACTIVE).exclude(id=self.stream.id)
if active_streams.exists():
logger.warning(f"Cannot start webcam - another stream is active: {active_streams.first().name}")
self.update_stream_status(StreamStatus.ERROR)
return False
# Files go directly in media directory
media_dir = Path(settings.MEDIA_ROOT)
playlist_path = str(media_dir / f'{self.stream.stream_key}.m3u8')
logger.info(f"Webcam playlist path: {playlist_path}")
# Default to webcam 0
device_index = 0
# Start FFmpeg conversion
logger.info(f"Starting FFmpeg webcam conversion with device {device_index}")
self.process = ffmpeg_handler.webcam_to_hls(device_index, playlist_path)
# Check if FFmpeg process started successfully
if self.process.poll() is not None:
# Process already exited - get error details
try:
stdout, stderr = self.process.communicate(timeout=2)
error_msg = stderr.decode('utf-8') if stderr else "Unknown FFmpeg error"
logger.error(f"FFmpeg failed to start webcam: {error_msg}")
except Exception as comm_error:
logger.error(f"FFmpeg failed and couldn't read error: {comm_error}")
error_msg = "FFmpeg process failed to start"
self.update_stream_status(StreamStatus.ERROR)
raise Exception(f"Webcam initialization failed: {error_msg}")
logger.info(f"FFmpeg process started successfully with PID: {self.process.pid}")
# Persist FFmpeg PID for stop operations
try:
pid_file = media_dir / f'{self.stream.stream_key}.pid'
with pid_file.open('w') as f:
f.write(str(self.process.pid))
except Exception as e:
logger.error(f"WebcamSourceAdapter: Failed to write PID file: {e}")
self.update_stream_status(StreamStatus.ACTIVE)
logger.info(f"Started webcam processing for stream {self.stream.id}")
# Monitor FFmpeg process and handle unexpected termination
threading.Thread(target=self._monitor_webcam, daemon=True).start()
return True
except Exception as e:
logger.error(f"Failed to start webcam processing: {e}")
logger.exception(f"Full exception details:")
self.update_stream_status(StreamStatus.ERROR)
return False
def stop_processing(self) -> bool:
try:
self.update_stream_status(StreamStatus.STOPPING)
media_dir = Path(settings.MEDIA_ROOT)
pid_file = media_dir / f'{self.stream.stream_key}.pid'
# Attempt to terminate in-memory process
if self.process and self.process.poll() is None:
self.process.terminate()
try:
self.process.wait(timeout=10)
except Exception:
pass
# Fallback: terminate by PID file
elif pid_file.exists():
try:
pid = int(pid_file.read_text())
os.kill(pid, signal.SIGTERM)
except Exception as kill_err:
logger.error(f"WebcamSourceAdapter: Failed to kill PID {pid}: {kill_err}")
# Cleanup PID file
if pid_file.exists():
try:
pid_file.unlink()
except Exception as unlink_err:
logger.error(f"WebcamSourceAdapter: Failed to remove PID file: {unlink_err}")
self.update_stream_status(StreamStatus.INACTIVE)
logger.info(f"Stopped webcam processing for stream {self.stream.id}")
return True
except Exception as e:
logger.error(f"Failed to stop webcam processing: {e}")
self.update_stream_status(StreamStatus.ERROR)
return False
def get_hls_output_path(self) -> str:
media_dir = Path(settings.MEDIA_ROOT)
return str(media_dir / f'{self.stream.stream_key}.m3u8')
def _monitor_webcam(self):
"""Monitor the FFmpeg webcam process and update stream status on exit"""
try:
exit_code = self.process.wait()
if exit_code != 0:
logger.error(f"FFmpeg webcam process terminated unexpectedly with code {exit_code}")
new_status = StreamStatus.ERROR
else:
logger.info(f"FFmpeg webcam process terminated normally with code {exit_code}")
new_status = StreamStatus.INACTIVE
self.update_stream_status(new_status)
except Exception as e:
logger.error(f"Error monitoring FFmpeg webcam process: {e}")
class SourceAdapterFactory: class SourceAdapterFactory:
"""Factory for creating video source adapters""" """Factory for creating video source adapters"""
@@ -133,6 +277,7 @@ class SourceAdapterFactory:
adapters = { adapters = {
'rtmp': RTMPSourceAdapter, 'rtmp': RTMPSourceAdapter,
'file': FileSourceAdapter, 'file': FileSourceAdapter,
'webcam': WebcamSourceAdapter,
} }
adapter_class = adapters.get(stream.source_type) adapter_class = adapters.get(stream.source_type)

View File

@@ -4,8 +4,9 @@ from . import views
urlpatterns = [ urlpatterns = [
path('streams/', views.list_streams, name='list_streams'), path('streams/', views.list_streams, name='list_streams'),
path('streams/create/', views.create_stream, name='create_stream'), path('streams/create/', views.create_stream, name='create_stream'),
path('streams/<int:stream_id>/start/', views.start_stream, name='start_stream'), path('streams/webcam/start/', views.start_webcam_stream, name='start_webcam_stream'),
path('streams/<int:stream_id>/stop/', views.stop_stream, name='stop_stream'), path('streams/<str:stream_key>/start/', views.start_stream, name='start_stream'),
path('streams/<str:stream_id>/analyze/', views.trigger_analysis, name='trigger_analysis'), path('streams/<str:stream_key>/stop/', views.stop_stream, name='stop_stream'),
path('hls/<str:filename>', views.serve_hls_file, name='serve_hls_file'), path('streams/<str:stream_key>/analyze/', views.trigger_analysis, name='trigger_analysis'),
path('<str:filename>', views.serve_hls_file, name='serve_hls_file'),
] ]

View File

@@ -17,15 +17,30 @@ logger = logging.getLogger(__name__)
@csrf_exempt @csrf_exempt
@require_http_methods(["POST"]) @require_http_methods(["POST"])
def create_stream(request): def create_stream(request):
"""Create new stream""" """Create or update RTMP stream (single stream pattern like webcam)"""
try: try:
data = json.loads(request.body) data = json.loads(request.body)
stream = VideoStream.objects.create( source_type = data.get('source_type', 'rtmp')
name=data['name'],
source_type=data.get('source_type', 'rtmp'), # Look for existing stream of this type first
processing_mode=data.get('processing_mode', 'live'), existing_stream = VideoStream.objects.filter(source_type=source_type).first()
stream_key=str(uuid.uuid4())
) if existing_stream:
# Update existing stream
existing_stream.name = data['name']
existing_stream.processing_mode = data.get('processing_mode', 'live')
existing_stream.save()
stream = existing_stream
logger.info(f"Updated existing {source_type} stream: {stream.id}")
else:
# Create new stream
stream = VideoStream.objects.create(
name=data['name'],
source_type=source_type,
processing_mode=data.get('processing_mode', 'live'),
stream_key=str(uuid.uuid4())
)
logger.info(f"Created new {source_type} stream: {stream.id}")
return JsonResponse({ return JsonResponse({
'id': stream.id, 'id': stream.id,
@@ -35,7 +50,7 @@ def create_stream(request):
'stream_key': stream.stream_key, 'stream_key': stream.stream_key,
'status': stream.status, 'status': stream.status,
'hls_playlist_url': f"{settings.HLS_BASE_URL}{settings.HLS_ENDPOINT_PATH}{stream.stream_key}.m3u8" if stream.status == 'active' else None, 'hls_playlist_url': f"{settings.HLS_BASE_URL}{settings.HLS_ENDPOINT_PATH}{stream.stream_key}.m3u8" if stream.status == 'active' else None,
'rtmp_ingest_url': f"rtmp://{request.get_host().split(':')[0]}:{settings.RTMP_PORT}/live/{stream.stream_key}", 'rtmp_ingest_url': f"rtmp://{request.get_host().split(':')[0]}:{settings.RTMP_PORT}/live",
'created_at': stream.created_at.isoformat() 'created_at': stream.created_at.isoformat()
}) })
@@ -52,9 +67,10 @@ def list_streams(request):
'name': s.name, 'name': s.name,
'source_type': s.source_type, 'source_type': s.source_type,
'processing_mode': s.processing_mode, 'processing_mode': s.processing_mode,
'stream_key': s.stream_key,
'status': s.status, 'status': s.status,
'hls_playlist_url': f"{settings.HLS_BASE_URL}{settings.HLS_ENDPOINT_PATH}{s.stream_key}.m3u8" if s.status == 'active' else None, 'hls_playlist_url': f"{settings.HLS_BASE_URL}{settings.HLS_ENDPOINT_PATH}{s.stream_key}.m3u8" if s.status == 'active' else None,
'rtmp_ingest_url': f"rtmp://{request.get_host().split(':')[0]}:{settings.RTMP_PORT}/live/{s.stream_key}", 'rtmp_ingest_url': f"rtmp://{request.get_host().split(':')[0]}:{settings.RTMP_PORT}/live",
'created_at': s.created_at.isoformat() 'created_at': s.created_at.isoformat()
} for s in streams] } for s in streams]
}) })
@@ -62,9 +78,9 @@ def list_streams(request):
@csrf_exempt @csrf_exempt
@require_http_methods(["POST"]) @require_http_methods(["POST"])
def start_stream(request, stream_id): def start_stream(request, stream_key):
"""Start stream processing""" """Start stream processing"""
stream = get_object_or_404(VideoStream, id=stream_id) stream = get_object_or_404(VideoStream, stream_key=stream_key)
try: try:
adapter = SourceAdapterFactory.create_adapter(stream) adapter = SourceAdapterFactory.create_adapter(stream)
@@ -79,15 +95,15 @@ def start_stream(request, stream_id):
return JsonResponse({'error': 'Failed to start stream'}, status=500) return JsonResponse({'error': 'Failed to start stream'}, status=500)
except Exception as e: except Exception as e:
logger.error(f"Error starting stream {stream_id}: {e}") logger.error(f"Error starting stream {stream_key}: {e}")
return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt @csrf_exempt
@require_http_methods(["POST"]) @require_http_methods(["POST"])
def stop_stream(request, stream_id): def stop_stream(request, stream_key):
"""Stop stream processing""" """Stop stream processing"""
stream = get_object_or_404(VideoStream, id=stream_id) stream = get_object_or_404(VideoStream, stream_key=stream_key)
try: try:
adapter = SourceAdapterFactory.create_adapter(stream) adapter = SourceAdapterFactory.create_adapter(stream)
@@ -99,7 +115,7 @@ def stop_stream(request, stream_id):
return JsonResponse({'error': 'Failed to stop stream'}, status=500) return JsonResponse({'error': 'Failed to stop stream'}, status=500)
except Exception as e: except Exception as e:
logger.error(f"Error stopping stream {stream_id}: {e}") logger.error(f"Error stopping stream {stream_key}: {e}")
return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': str(e)}, status=500)
@@ -115,21 +131,27 @@ def serve_hls_file(request, filename):
# Trigger analysis for new .ts segments # Trigger analysis for new .ts segments
if filename.endswith('.ts'): if filename.endswith('.ts'):
logger.info(f"Processing .ts file request: {filename}")
try: try:
# Extract stream ID from UUID-based filename: 43606ec7-786c-4f7d-acf3-95981f9e5ebe-415.ts # Extract stream_key from filename: "stream_key-segment_number.ts" -> "stream_key"
if '-' in filename: # Example: "69f79422-5816-4cf0-9f44-0ac1421b8b8e-123.ts" -> "69f79422-5816-4cf0-9f44-0ac1421b8b8e"
# Split by dash and take first 5 parts (UUID format) base_name = filename.rsplit('.', 1)[0] # Remove .ts extension
parts = filename.split('-') stream_key = base_name.rsplit('-', 1)[0] # Remove last segment: "-123"
if len(parts) >= 5: logger.info(f"Parsed stream_key: {stream_key} from filename: {filename}")
stream_id = '-'.join(parts[:5]) # Reconstruct UUID
# Queue for analysis if stream_key:
analyzer = VideoAnalyzer() # Queue for analysis
analyzer.queue_segment_analysis(stream_id, file_path) logger.info(f"Attempting to queue analysis for {filename}")
logger.info(f"Queued segment for analysis: {filename} (stream: {stream_id})") analyzer = VideoAnalyzer()
analyzer.queue_segment_analysis(stream_key, file_path)
logger.info(f"Queued segment for analysis: {filename} (stream: {stream_key})")
else:
logger.warning(f"No stream_key extracted from {filename}")
except Exception as e: except Exception as e:
logger.error(f"Error queuing analysis for {filename}: {e}") logger.error(f"Error queuing analysis for {filename}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Determine content type # Determine content type
if filename.endswith('.m3u8'): if filename.endswith('.m3u8'):
@@ -148,23 +170,25 @@ def serve_hls_file(request, filename):
@csrf_exempt @csrf_exempt
@require_http_methods(["POST"]) @require_http_methods(["POST"])
def trigger_analysis(request, stream_id): def trigger_analysis(request, stream_key):
"""Manually trigger analysis for testing""" """Manually trigger analysis for testing"""
try: try:
data = json.loads(request.body) if request.body else {} data = json.loads(request.body) if request.body else {}
segment_path = data.get('segment_path') segment_path = data.get('segment_path')
if not segment_path: if not segment_path:
# Find latest segment # Find latest segment in media directory
media_dir = os.path.join(settings.BASE_DIR.parent.parent, 'media') media_dir = settings.MEDIA_ROOT
ts_files = [f for f in os.listdir(media_dir) if f.endswith('.ts')] ts_files = [f for f in os.listdir(media_dir) if f.endswith('.ts')]
if ts_files: if ts_files:
# Sort by filename to get the latest segment
ts_files.sort()
segment_path = os.path.join(media_dir, ts_files[-1]) segment_path = os.path.join(media_dir, ts_files[-1])
else: else:
return JsonResponse({'error': 'No segments found'}, status=404) return JsonResponse({'error': 'No segments found'}, status=404)
analyzer = VideoAnalyzer() analyzer = VideoAnalyzer()
success = analyzer.queue_segment_analysis(stream_id, segment_path) success = analyzer.queue_segment_analysis(stream_key, segment_path)
if success: if success:
return JsonResponse({'message': 'Analysis triggered', 'segment': segment_path}) return JsonResponse({'message': 'Analysis triggered', 'segment': segment_path})
@@ -174,3 +198,73 @@ def trigger_analysis(request, stream_id):
except Exception as e: except Exception as e:
logger.error(f"Error triggering analysis: {e}") logger.error(f"Error triggering analysis: {e}")
return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': str(e)}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def start_webcam_stream(request):
"""Start or reuse existing webcam stream"""
try:
# Look for existing webcam stream first
webcam_stream = VideoStream.objects.filter(source_type='webcam').first()
if not webcam_stream:
# Create new webcam stream
webcam_stream = VideoStream.objects.create(
name='Webcam Stream',
source_type='webcam',
processing_mode='live',
stream_key=str(uuid.uuid4())
)
logger.info(f"Created new webcam stream: {webcam_stream.id}")
# Check if another stream is active
active_streams = VideoStream.objects.filter(status=StreamStatus.ACTIVE).exclude(id=webcam_stream.id)
if active_streams.exists():
other = active_streams.first()
return JsonResponse({
'error': f'Another stream is active: {other.name}',
'active_stream_key': other.stream_key,
'active_stream_name': other.name
}, status=409)
# Start the webcam stream if not already active
if webcam_stream.status != StreamStatus.ACTIVE:
adapter = SourceAdapterFactory.create_adapter(webcam_stream)
success = adapter.start_processing()
if not success:
return JsonResponse({'error': 'Failed to start webcam'}, status=500)
# Wait for HLS playlist to be ready before returning
import time
playlist_path = os.path.join(settings.MEDIA_ROOT, f"{webcam_stream.stream_key}.m3u8")
max_wait_time = 10 # seconds
wait_interval = 0.5 # seconds
elapsed_time = 0
logger.info(f"Waiting for HLS playlist to be ready: {playlist_path}")
while elapsed_time < max_wait_time:
if os.path.exists(playlist_path) and os.path.getsize(playlist_path) > 0:
logger.info(f"HLS playlist ready after {elapsed_time:.1f}s")
break
time.sleep(wait_interval)
elapsed_time += wait_interval
if not os.path.exists(playlist_path):
logger.warning(f"HLS playlist not ready after {max_wait_time}s, returning anyway")
return JsonResponse({
'id': webcam_stream.id,
'name': webcam_stream.name,
'source_type': webcam_stream.source_type,
'processing_mode': webcam_stream.processing_mode,
'stream_key': webcam_stream.stream_key,
'status': webcam_stream.status,
'hls_playlist_url': f"{settings.HLS_BASE_URL}{settings.HLS_ENDPOINT_PATH}{webcam_stream.stream_key}.m3u8",
'created_at': webcam_stream.created_at.isoformat()
})
except Exception as e:
logger.error(f"Error starting webcam stream: {e}")
return JsonResponse({'error': str(e)}, status=500)

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env python
"""Quick test script to verify AI pipeline works"""
import os
import django
import sys
from pathlib import Path
# Add the backend directory to Python path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
# Configure Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.processors.video_analyzer import VideoAnalyzer
from ai_processing.models import AnalysisProvider
from PIL import Image
import numpy as np
def test_ai_pipeline():
print("🧪 Testing AI Pipeline...")
# Check providers
providers = AnalysisProvider.objects.all()
print(f"📊 Found {providers.count()} providers:")
for p in providers:
print(f" - {p.name} ({p.provider_type}) - Active: {p.active}")
# Create test analyzer
analyzer = VideoAnalyzer()
# Create a test image (simple colored rectangle)
print("\n🖼️ Creating test image...")
test_image = Image.new('RGB', (640, 480), color='red')
# Test synchronous analysis
print("🔍 Running synchronous analysis...")
try:
result = analyzer.analyze_frame_sync(test_image)
print(f"✅ Analysis result: {result}")
if 'error' in result:
print(f"❌ Error: {result['error']}")
else:
print(f"✅ Found {len(result.get('logos', []))} logo detections")
for logo in result.get('logos', []):
print(f" - {logo['label']}: {logo['confidence']:.3f}")
except Exception as e:
print(f"❌ Analysis failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_ai_pipeline()

View File

@@ -1,88 +0,0 @@
#!/usr/bin/env python
"""Test frontend integration by creating sample analysis data"""
import os
import django
import sys
from pathlib import Path
# Configure Django
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.models import VideoAnalysis, DetectionResult, VisualAnalysis, AnalysisProvider
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def create_sample_analysis():
"""Create sample analysis data for testing frontend"""
print("🎯 Creating sample analysis data...")
# Get CLIP provider
provider = AnalysisProvider.objects.filter(provider_type='local_clip').first()
# Create analysis
analysis = VideoAnalysis.objects.create(
stream_id='test_stream',
segment_path='/fake/path.ts',
provider=provider,
analysis_type='logo_detection',
frame_timestamp=0.0,
confidence_threshold=0.3
)
# Create sample detections
DetectionResult.objects.create(
analysis=analysis,
label='Apple',
confidence=0.85,
bbox_x=0.2,
bbox_y=0.3,
bbox_width=0.3,
bbox_height=0.2,
detection_type='logo'
)
DetectionResult.objects.create(
analysis=analysis,
label='Google',
confidence=0.72,
bbox_x=0.5,
bbox_y=0.1,
bbox_width=0.25,
bbox_height=0.15,
detection_type='logo'
)
# Create visual analysis
VisualAnalysis.objects.create(
analysis=analysis,
dominant_colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]],
brightness_level=0.7,
contrast_level=0.5,
saturation_level=0.8
)
print(f"✅ Created analysis: {analysis.to_dict()}")
# Try to send via WebSocket
try:
channel_layer = get_channel_layer()
if channel_layer:
async_to_sync(channel_layer.group_send)(
"stream_test_stream",
{
"type": "analysis_update",
"analysis": analysis.to_dict()
}
)
print("📡 Sent WebSocket update")
else:
print("⚠️ No channel layer configured")
except Exception as e:
print(f"❌ WebSocket send failed: {e}")
if __name__ == "__main__":
create_sample_analysis()

View File

@@ -1,92 +0,0 @@
#!/usr/bin/env python
"""Test the complete pipeline: AI analysis -> WebSocket -> Frontend"""
import os
import django
import sys
import json
import asyncio
from pathlib import Path
# Add the backend directory to Python path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
# Configure Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'media_analyzer.settings.development')
django.setup()
from ai_processing.tasks import process_video_segment
from ai_processing.models import VideoAnalysis, AnalysisProvider
from PIL import Image, ImageDraw, ImageFont
import tempfile
def create_test_image_with_apple_logo():
"""Create a test image with 'Apple' text as logo simulation"""
img = Image.new('RGB', (640, 480), color='white')
draw = ImageDraw.Draw(img)
# Draw "Apple" text in the center
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", 48)
except:
font = ImageFont.load_default()
text = "Apple iPhone"
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (640 - text_width) // 2
y = (480 - text_height) // 2
draw.text((x, y), text, fill='black', font=font)
return img
def test_full_pipeline():
print("🧪 Testing Complete Pipeline...")
# Create test image
print("🖼️ Creating test image with Apple logo simulation...")
test_image = create_test_image_with_apple_logo()
# Save to temporary file as a fake video segment
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file:
test_image.save(tmp_file.name)
fake_segment_path = tmp_file.name
print(f"💾 Saved test image to: {fake_segment_path}")
# Test the processing task directly
print("🚀 Triggering analysis task...")
try:
result = process_video_segment('test_stream', fake_segment_path)
print(f"✅ Task result: {result}")
# Check if analysis was stored
analysis = VideoAnalysis.objects.filter(stream_id='test_stream').last()
if analysis:
print(f"📊 Analysis stored: {analysis.to_dict()}")
detections = analysis.detections.all()
print(f"🎯 Found {detections.count()} detections:")
for detection in detections:
print(f" - {detection.label}: {detection.confidence:.3f}")
else:
print("❌ No analysis found in database")
except Exception as e:
print(f"❌ Task failed: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup
try:
os.unlink(fake_segment_path)
except:
pass
if __name__ == "__main__":
test_full_pipeline()

View File

@@ -9,6 +9,12 @@ services:
volumes: volumes:
- ./backend:/app - ./backend:/app
- ./media:/app/media - ./media:/app/media
# Linux webcam support (comment out on macOS/Windows if needed)
devices:
- /dev/video0:/dev/video0
group_add:
- video
user: "${UID:-1000}:${GID:-1000}"
environment: environment:
- DEBUG=1 - DEBUG=1
- DB_HOST=postgres - DB_HOST=postgres
@@ -17,6 +23,8 @@ services:
- DB_PASSWORD=media_pass - DB_PASSWORD=media_pass
- REDIS_HOST=redis - REDIS_HOST=redis
- REDIS_PORT=6379 - REDIS_PORT=6379
# RTMP ingestion host reachable from backend container
- RTMP_HOST=nginx-rtmp
- HLS_BASE_URL=http://nginx-rtmp:8081 - HLS_BASE_URL=http://nginx-rtmp:8081
- HLS_ENDPOINT_PATH=/ - HLS_ENDPOINT_PATH=/
- MEDIA_ROOT=/app/media - MEDIA_ROOT=/app/media
@@ -104,6 +112,30 @@ services:
condition: service_started condition: service_started
command: ./entrypoint-celery.sh --queues=default,config_management --hostname=default-worker@%h command: ./entrypoint-celery.sh --queues=default,config_management --hostname=default-worker@%h
# File Watcher - Monitor HLS segments
file-watcher:
build:
context: ./backend
target: development
volumes:
- ./backend:/app
- ./media:/app/media
environment:
- DEBUG=1
- DB_HOST=postgres
- DB_NAME=media_analyzer
- DB_USER=media_user
- DB_PASSWORD=media_pass
- REDIS_HOST=redis
- REDIS_PORT=6379
- MEDIA_ROOT=/app/media
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
command: python manage.py watch_hls_files --poll-interval=2
# Angular Frontend with NGINX (unified approach) # Angular Frontend with NGINX (unified approach)
frontend: frontend:
build: build:
@@ -145,7 +177,9 @@ services:
- "0.0.0.0:1935:1935" # RTMP port - bind to all interfaces - "0.0.0.0:1935:1935" # RTMP port - bind to all interfaces
- "0.0.0.0:8081:80" # HTTP port for HLS - bind to all interfaces - "0.0.0.0:8081:80" # HTTP port for HLS - bind to all interfaces
volumes: volumes:
- ./media:/var/www/html # Persist HLS segments and playlists to host media directory
- ./media:/var/www/media
# Custom nginx.conf for RTMP/HLS configuration
- ./docker/nginx.conf:/etc/nginx/nginx.conf - ./docker/nginx.conf:/etc/nginx/nginx.conf
depends_on: depends_on:
- postgres - postgres

View File

@@ -13,7 +13,7 @@ rtmp {
# Turn on HLS # Turn on HLS
hls on; hls on;
hls_path /var/www/html; hls_path /var/www/media;
hls_fragment 3; hls_fragment 3;
hls_playlist_length 60; hls_playlist_length 60;
@@ -34,42 +34,26 @@ http {
server { server {
listen 80; listen 80;
# HLS playlist files
location ~* \.m3u8$ {
add_header Content-Type application/vnd.apple.mpegurl;
add_header Access-Control-Allow-Origin *;
add_header Cache-Control no-cache;
root /var/www/media;
}
# HLS segment files
location ~* \.ts$ {
add_header Content-Type video/mp2t;
add_header Access-Control-Allow-Origin *;
add_header Cache-Control no-cache;
root /var/www/media;
}
# Fallback for other requests
location / { location / {
# Disable cache root /var/www/media;
add_header 'Cache-Control' 'no-cache';
# CORS setup
add_header 'Access-Control-Allow-Origin' '*' always;
add_header 'Access-Control-Expose-Headers' 'Content-Length,Content-Range';
add_header 'Access-Control-Allow-Headers' 'Range';
# allow CORS preflight requests
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Max-Age' 1728000;
add_header 'Content-Type' 'text/plain charset=UTF-8';
add_header 'Content-Length' 0;
return 204;
}
# HLS playlist files
location ~* \.m3u8$ {
add_header Content-Type application/vnd.apple.mpegurl;
add_header Cache-Control no-cache;
root /var/www/html/;
}
# HLS segment files
location ~* \.ts$ {
add_header Content-Type video/mp2t;
add_header Cache-Control no-cache;
root /var/www/html/;
}
# Default location
location / {
root /var/www/html/;
}
} }
} }
} }

View File

@@ -62,15 +62,16 @@ server {
proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Forwarded-Proto $scheme;
} }
# HLS streaming proxy # HLS streaming proxy: route to nginx-rtmp HLS server
location /streaming/ { location /streaming/ {
resolver 127.0.0.11 valid=30s; # HLS proxy: strip /streaming/ prefix and forward to nginx-rtmp
set $backend backend:8000; rewrite ^/streaming/(.*)$ /$1 break;
proxy_pass http://$backend; proxy_pass http://nginx-rtmp;
proxy_http_version 1.1;
proxy_set_header Host $host; proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr; add_header Access-Control-Allow-Origin *;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; add_header Cache-Control no-cache;
proxy_set_header X-Forwarded-Proto $scheme; proxy_buffering off;
} }

View File

@@ -6,7 +6,8 @@
<main class="main-content"> <main class="main-content">
<div class="controls-section"> <div class="controls-section">
<app-stream-control <app-stream-control
(streamSelected)="onStreamSelected($event)"> (streamSelected)="onStreamSelected($event)"
(streamStopped)="onStreamStopped()">
</app-stream-control> </app-stream-control>
</div> </div>

View File

@@ -14,26 +14,44 @@
.main-content { .main-content {
display: grid; display: grid;
grid-template-columns: 1fr 2fr; grid-template-columns: 350px 1fr;
grid-template-rows: auto auto;
grid-template-areas:
"controls viewer"
"controls analysis";
gap: 2rem; gap: 2rem;
max-width: 1400px; max-width: 1400px;
margin: 0 auto; margin: 0 auto;
@media (max-width: 768px) { @media (max-width: 768px) {
grid-template-columns: 1fr; grid-template-columns: 1fr;
grid-template-rows: auto auto auto;
grid-template-areas:
"controls"
"viewer"
"analysis";
gap: 1rem; gap: 1rem;
} }
} }
.controls-section { .controls-section {
grid-area: controls;
background: #f8f9fa; background: #f8f9fa;
padding: 1.5rem; padding: 1.5rem;
border-radius: 8px; border-radius: 8px;
} }
.viewer-section { .viewer-section {
grid-area: viewer;
background: #000; background: #000;
padding: 1rem; padding: 1rem;
border-radius: 8px; border-radius: 8px;
} }
.analysis-section {
grid-area: analysis;
background: #f8f9fa;
padding: 1rem;
border-radius: 8px;
}
} }

View File

@@ -1,12 +1,12 @@
import { Component, OnInit, OnDestroy } from '@angular/core'; import { Component, OnInit, OnDestroy, ViewChild } from '@angular/core';
import { RouterOutlet } from '@angular/router'; import { RouterOutlet } from '@angular/router';
import { HttpClientModule } from '@angular/common/http'; import { HttpClientModule } from '@angular/common/http';
import { StreamControlComponent } from './components/stream-control/stream-control.component'; import { StreamControlComponent } from './components/stream-control/stream-control.component';
import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component'; import { StreamViewerComponent } from './components/stream-viewer/stream-viewer.component';
import { AnalysisPanelComponent } from './components/analysis-panel/analysis-panel.component'; import { AnalysisPanelComponent } from './components/analysis-panel/analysis-panel.component';
import { AnalysisService } from './services/analysis.service'; import { AnalysisService } from './services/analysis.service';
import { StreamService } from './services/stream.service';
import { DetectionResult, VisualAnalysis, Analysis } from './models/analysis'; import { DetectionResult, VisualAnalysis, Analysis } from './models/analysis';
import { environment } from '../environments/environment';
@Component({ @Component({
selector: 'app-root', selector: 'app-root',
@@ -16,6 +16,8 @@ import { environment } from '../environments/environment';
styleUrl: './app.component.scss' styleUrl: './app.component.scss'
}) })
export class AppComponent implements OnInit, OnDestroy { export class AppComponent implements OnInit, OnDestroy {
@ViewChild(StreamViewerComponent) streamViewer!: StreamViewerComponent;
title = 'Media Analyzer'; title = 'Media Analyzer';
selectedStreamUrl: string = ''; selectedStreamUrl: string = '';
currentStreamId: string = ''; currentStreamId: string = '';
@@ -23,7 +25,10 @@ export class AppComponent implements OnInit, OnDestroy {
currentVisual?: VisualAnalysis; currentVisual?: VisualAnalysis;
recentAnalyses: Analysis[] = []; recentAnalyses: Analysis[] = [];
constructor(private analysisService: AnalysisService) {} constructor(
private analysisService: AnalysisService,
private streamService: StreamService
) {}
ngOnInit() { ngOnInit() {
// Subscribe to analysis updates // Subscribe to analysis updates
@@ -47,20 +52,63 @@ export class AppComponent implements OnInit, OnDestroy {
onStreamSelected(streamUrl: string) { onStreamSelected(streamUrl: string) {
console.log('App received stream URL:', streamUrl); console.log('App received stream URL:', streamUrl);
// Convert backend URL to browser-accessible URL using environment config // Extract filename from backend URL, then construct a browser-resolvable URL
const browserUrl = streamUrl.replace(/^http:\/\/[^\/]+/, environment.hlsBaseUrl); const filename = streamUrl.split('/').pop() || '';
this.selectedStreamUrl = browserUrl; this.selectedStreamUrl = `/streaming/${filename}`;
console.log('Converted to browser URL:', browserUrl); console.log('Using HLS URL:', this.selectedStreamUrl);
// Extract stream ID from filename: 476c0bd7-d037-4b6c-a29d-0773c19a76c5.m3u8 // Retry function to get active stream (with small delays to allow DB update)
const streamIdMatch = streamUrl.match(/([0-9a-f-]+)\.m3u8/); const getActiveStreamWithRetry = (attempt = 1, maxAttempts = 3) => {
this.streamService.getStreams().subscribe({
next: (response) => {
const activeStream = response.streams.find(stream => stream.status === 'active');
if (activeStream) {
this.currentStreamId = activeStream.stream_key;
console.log('Found active stream with key:', this.currentStreamId);
// Connect to WebSocket for this stream
this.analysisService.connectToStream(this.currentStreamId);
} else if (attempt < maxAttempts) {
console.log(`No active stream found (attempt ${attempt}/${maxAttempts}), retrying in 1s...`);
setTimeout(() => getActiveStreamWithRetry(attempt + 1, maxAttempts), 1000);
} else {
console.log('No active stream found after retries, falling back to filename parsing');
this.fallbackToFilenameExtraction(filename);
}
},
error: (error) => {
console.error('Failed to get streams from API:', error);
this.fallbackToFilenameExtraction(filename);
}
});
};
// Start the retry process
getActiveStreamWithRetry();
}
private fallbackToFilenameExtraction(filename: string) {
const streamIdMatch = filename.match(/^([a-zA-Z0-9-]+)\.m3u8$/);
if (streamIdMatch) { if (streamIdMatch) {
this.currentStreamId = streamIdMatch[1]; this.currentStreamId = streamIdMatch[1];
console.log('Extracted stream ID:', this.currentStreamId); console.log('Fallback: Extracted stream ID from filename:', this.currentStreamId);
// Connect to WebSocket for this stream
this.analysisService.connectToStream(this.currentStreamId); this.analysisService.connectToStream(this.currentStreamId);
} else { } else {
console.error('Could not extract stream ID from URL:', streamUrl); console.error('Could not extract stream ID from filename:', filename);
} }
} }
onStreamStopped() {
console.log('Stream stopped - clearing player');
// Clear the stream from player
if (this.streamViewer) {
this.streamViewer.clearStream();
}
// Clear app state
this.selectedStreamUrl = '';
this.currentStreamId = '';
this.currentDetections = [];
this.currentVisual = undefined;
// Disconnect from WebSocket
this.analysisService.disconnect();
}
} }

View File

@@ -1,8 +1,9 @@
import { ApplicationConfig } from '@angular/core'; import { ApplicationConfig } from '@angular/core';
import { provideRouter } from '@angular/router'; import { provideRouter } from '@angular/router';
import { provideHttpClient } from '@angular/common/http';
import { routes } from './app.routes'; import { routes } from './app.routes';
export const appConfig: ApplicationConfig = { export const appConfig: ApplicationConfig = {
providers: [provideRouter(routes)] providers: [provideRouter(routes), provideHttpClient()]
}; };

View File

@@ -1,38 +1,94 @@
<div class="stream-control"> <div class="stream-control">
<h2>Stream Control</h2> <h2>Stream Control</h2>
<!-- Create New Stream --> <!-- Tabs -->
<div class="create-stream"> <div class="tabs">
<h3>Create Stream</h3> <button
<input class="tab"
type="text" [class.active]="activeTab === 'webcam'"
[(ngModel)]="newStreamName" (click)="switchTab('webcam')">
placeholder="Stream name" 📹 Webcam
class="input"> </button>
<button (click)="createStream()" class="btn btn-primary">Create Stream</button> <button
class="tab"
[class.active]="activeTab === 'rtmp'"
(click)="switchTab('rtmp')">
📡 RTMP Stream
</button>
</div> </div>
<!-- Stream List --> <!-- Webcam Tab -->
<div class="streams-list"> <div class="tab-content" *ngIf="activeTab === 'webcam'">
<h3>Streams</h3> <div class="webcam-section">
<div class="stream-item" *ngFor="let stream of streams"> <h3>Webcam Stream</h3>
<div class="stream-info"> <p class="description">Start your webcam for real-time logo detection</p>
<h4>{{ stream.name }}</h4>
<p class="status">Status: <span [class]="'status-' + stream.status">{{ stream.status }}</span></p> <button (click)="startWebcam()" class="btn btn-webcam">📹 Start Webcam</button>
<p class="rtmp-url">RTMP URL: <code>{{ stream.rtmp_ingest_url }}</code></p>
<p class="stream-key">Stream Key: <code>{{ stream.stream_key }}</code></p> <!-- Webcam Stream List -->
<div class="streams-list" *ngIf="webcamStreams.length > 0">
<h4>Active Webcam Stream</h4>
<div class="stream-item" *ngFor="let stream of webcamStreams">
<div class="stream-info">
<h4>{{ stream.name }}</h4>
<p class="status">Status: <span [class]="'status-' + stream.status">{{ stream.status }}</span></p>
<p class="stream-key">Stream Key: <code>{{ stream.stream_key }}</code></p>
</div>
<div class="stream-actions">
<button (click)="startStream(stream)"
[disabled]="stream.status === 'active'"
class="btn btn-success">Start</button>
<button (click)="stopStream(stream)"
[disabled]="stream.status === 'inactive'"
class="btn btn-danger">Stop</button>
<button (click)="selectStream(stream)"
*ngIf="stream.hls_playlist_url"
class="btn btn-info">Load</button>
</div>
</div>
</div>
</div>
</div>
<!-- RTMP Tab -->
<div class="tab-content" *ngIf="activeTab === 'rtmp'">
<div class="rtmp-section">
<!-- Create New Stream -->
<div class="create-stream">
<h3>Create RTMP Stream</h3>
<p class="description">Create a stream for OBS or other RTMP sources</p>
<input
type="text"
[(ngModel)]="newStreamName"
placeholder="Stream name"
class="input">
<button (click)="createStream()" class="btn btn-primary">Create Stream</button>
</div> </div>
<div class="stream-actions"> <!-- RTMP Stream List -->
<button (click)="startStream(stream)" <div class="streams-list" *ngIf="rtmpStreams.length > 0">
[disabled]="stream.status === 'active'" <h4>RTMP Streams</h4>
class="btn btn-success">Start</button> <div class="stream-item" *ngFor="let stream of rtmpStreams">
<button (click)="stopStream(stream)" <div class="stream-info">
[disabled]="stream.status === 'inactive'" <h4>{{ stream.name }}</h4>
class="btn btn-danger">Stop</button> <p class="status">Status: <span [class]="'status-' + stream.status">{{ stream.status }}</span></p>
<button (click)="selectStream(stream)" <p class="rtmp-url">RTMP URL: <code>{{ stream.rtmp_ingest_url }}</code></p>
*ngIf="stream.hls_playlist_url" <p class="stream-key">Stream Key: <code>{{ stream.stream_key }}</code></p>
class="btn btn-info">Load</button> </div>
<div class="stream-actions">
<button (click)="startStream(stream)"
[disabled]="stream.status === 'active'"
class="btn btn-success">Start</button>
<button (click)="stopStream(stream)"
[disabled]="stream.status === 'inactive'"
class="btn btn-danger">Stop</button>
<button (click)="selectStream(stream)"
*ngIf="stream.hls_playlist_url"
class="btn btn-info">Load</button>
</div>
</div>
</div> </div>
</div> </div>
</div> </div>

View File

@@ -0,0 +1,134 @@
.stream-control {
padding: 20px;
}
.tabs {
display: flex;
margin-bottom: 20px;
border-bottom: 2px solid #ddd;
}
.tab {
padding: 12px 24px;
border: none;
background: transparent;
cursor: pointer;
font-size: 16px;
font-weight: 500;
border-bottom: 3px solid transparent;
margin-right: 8px;
transition: all 0.2s;
}
.tab:hover {
background: #f8f9fa;
}
.tab.active {
color: #007bff;
border-bottom-color: #007bff;
background: #f8f9fa;
}
.tab-content {
margin-top: 20px;
}
.webcam-section, .rtmp-section {
min-height: 300px;
}
.description {
color: #666;
margin-bottom: 15px;
font-style: italic;
}
.btn-webcam {
background: #4CAF50;
color: white;
border: none;
padding: 12px 24px;
border-radius: 8px;
cursor: pointer;
font-size: 16px;
font-weight: 500;
}
.btn-webcam:hover {
background: #45a049;
}
.create-stream {
margin-bottom: 20px;
padding: 15px;
border: 1px solid #ddd;
border-radius: 8px;
}
.input {
padding: 8px;
margin-right: 10px;
border: 1px solid #ddd;
border-radius: 4px;
min-width: 200px;
}
.btn {
padding: 8px 16px;
border: none;
border-radius: 4px;
cursor: pointer;
margin-right: 8px;
}
.btn-primary { background: #007bff; color: white; }
.btn-success { background: #28a745; color: white; }
.btn-danger { background: #dc3545; color: white; }
.btn-info { background: #17a2b8; color: white; }
.btn:disabled {
background: #6c757d !important;
cursor: not-allowed;
}
.streams-list {
border: 1px solid #ddd;
border-radius: 8px;
padding: 15px;
}
.stream-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 15px;
border-bottom: 1px solid #eee;
}
.stream-item:last-child {
border-bottom: none;
}
.stream-info h4 {
margin: 0 0 8px 0;
}
.source-type {
font-weight: normal;
font-size: 0.8em;
color: #666;
}
.status-active { color: #28a745; font-weight: bold; }
.status-inactive { color: #6c757d; }
.status-starting { color: #ffc107; }
.status-stopping { color: #fd7e14; }
.status-error { color: #dc3545; font-weight: bold; }
code {
background: #f8f9fa;
padding: 2px 4px;
border-radius: 3px;
font-size: 0.9em;
}

View File

@@ -1,20 +1,8 @@
import { Component, EventEmitter, Output } from '@angular/core'; import { Component, EventEmitter, Output } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { CommonModule } from '@angular/common'; import { CommonModule } from '@angular/common';
import { FormsModule } from '@angular/forms'; import { FormsModule } from '@angular/forms';
import { environment } from '../../../environments/environment'; import { StreamService } from '../../services/stream.service';
import { Stream } from '../../models/stream';
interface Stream {
id: number;
name: string;
source_type: string;
processing_mode: string;
status: string;
stream_key: string;
hls_playlist_url: string | null;
rtmp_ingest_url: string;
created_at: string;
}
@Component({ @Component({
selector: 'app-stream-control', selector: 'app-stream-control',
@@ -25,17 +13,19 @@ interface Stream {
}) })
export class StreamControlComponent { export class StreamControlComponent {
@Output() streamSelected = new EventEmitter<string>(); @Output() streamSelected = new EventEmitter<string>();
@Output() streamStopped = new EventEmitter<void>();
streams: Stream[] = []; streams: Stream[] = [];
newStreamName = ''; newStreamName = '';
selectedStream: Stream | null = null; selectedStream: Stream | null = null;
activeTab: 'rtmp' | 'webcam' = 'webcam';
constructor(private http: HttpClient) { constructor(private streamService: StreamService) {
this.loadStreams(); this.loadStreams();
} }
loadStreams() { loadStreams() {
this.http.get<{streams: Stream[]}>(`${environment.apiUrl}/streams/`).subscribe({ this.streamService.getStreams().subscribe({
next: (response) => { next: (response) => {
this.streams = response.streams; this.streams = response.streams;
}, },
@@ -46,7 +36,7 @@ export class StreamControlComponent {
createStream() { createStream() {
if (!this.newStreamName) return; if (!this.newStreamName) return;
this.http.post<Stream>(`${environment.apiUrl}/streams/create/`, { this.streamService.createStream({
name: this.newStreamName, name: this.newStreamName,
source_type: 'rtmp', source_type: 'rtmp',
processing_mode: 'live' processing_mode: 'live'
@@ -60,8 +50,12 @@ export class StreamControlComponent {
} }
startStream(stream: Stream) { startStream(stream: Stream) {
this.http.post(`${environment.apiUrl}/streams/${stream.id}/start/`, {}).subscribe({ this.streamService.startStream(stream.stream_key).subscribe({
next: () => { next: (response) => {
console.log('Stream started, HLS URL:', response.hls_playlist_url);
// Emit the stream selection immediately with the HLS URL from response
this.streamSelected.emit(response.hls_playlist_url);
// Then reload streams to get updated status
this.loadStreams(); this.loadStreams();
}, },
error: (error) => console.error('Error starting stream:', error) error: (error) => console.error('Error starting stream:', error)
@@ -69,14 +63,56 @@ export class StreamControlComponent {
} }
stopStream(stream: Stream) { stopStream(stream: Stream) {
this.http.post(`${environment.apiUrl}/streams/${stream.id}/stop/`, {}).subscribe({ this.streamService.stopStream(stream.stream_key).subscribe({
next: () => { next: () => {
this.loadStreams(); this.loadStreams();
// Emit event to clear the player
this.streamStopped.emit();
}, },
error: (error) => console.error('Error stopping stream:', error) error: (error) => console.error('Error stopping stream:', error)
}); });
} }
startWebcam() {
this.streamService.startWebcamStream().subscribe({
next: (stream) => {
this.loadStreams();
// Backend now waits for HLS to be ready, so we can directly select
this.selectStream(stream);
},
error: (error) => {
console.error('Error starting webcam:', error);
if (error.status === 409) {
const activeStreamKey = error.error.active_stream_key;
if (activeStreamKey) {
console.log(`Stopping active stream ${activeStreamKey} before retrying webcam`);
this.streamService.stopStream(activeStreamKey).subscribe({
next: () => this.startWebcam(),
error: (stopError) => {
console.error('Error stopping active stream:', stopError);
alert(`Cannot start webcam: ${error.error.error}`);
}
});
} else {
alert(`Cannot start webcam: ${error.error.error}`);
}
}
}
});
}
switchTab(tab: 'rtmp' | 'webcam') {
this.activeTab = tab;
}
get rtmpStreams() {
return this.streams.filter(stream => stream.source_type === 'rtmp');
}
get webcamStreams() {
return this.streams.filter(stream => stream.source_type === 'webcam');
}
selectStream(stream: Stream) { selectStream(stream: Stream) {
this.selectedStream = stream; this.selectedStream = stream;
if (stream.hls_playlist_url) { if (stream.hls_playlist_url) {

View File

@@ -188,4 +188,24 @@ export class StreamViewerComponent implements AfterViewInit, OnDestroy, OnChange
const canvas = this.overlayElement.nativeElement; const canvas = this.overlayElement.nativeElement;
this.ctx.clearRect(0, 0, canvas.width, canvas.height); this.ctx.clearRect(0, 0, canvas.width, canvas.height);
} }
clearStream() {
const video = this.videoElement.nativeElement;
// Stop and clear HLS
if (this.hls) {
this.hls.destroy();
this.hls = undefined;
}
// Clear video source and stop playback
video.src = '';
video.srcObject = null;
video.load(); // Reset video element
// Clear overlay
this.clearOverlay();
console.log('Stream cleared');
}
} }

View File

@@ -1,2 +1,11 @@
export interface Stream { export interface Stream {
id: number;
name: string;
source_type: 'rtmp' | 'webcam' | 'file' | 'url';
processing_mode: 'live' | 'batch';
status: 'inactive' | 'starting' | 'active' | 'stopping' | 'error';
stream_key: string;
hls_playlist_url?: string;
rtmp_ingest_url?: string;
created_at: string;
} }

View File

@@ -10,6 +10,7 @@ export class AnalysisService {
private currentDetections = new BehaviorSubject<DetectionResult[]>([]); private currentDetections = new BehaviorSubject<DetectionResult[]>([]);
private currentVisual = new BehaviorSubject<VisualAnalysis | null>(null); private currentVisual = new BehaviorSubject<VisualAnalysis | null>(null);
private recentAnalyses = new BehaviorSubject<Analysis[]>([]); private recentAnalyses = new BehaviorSubject<Analysis[]>([]);
private streamStartTime: Date | null = null;
public detections$ = this.currentDetections.asObservable(); public detections$ = this.currentDetections.asObservable();
public visual$ = this.currentVisual.asObservable(); public visual$ = this.currentVisual.asObservable();
@@ -23,16 +24,34 @@ export class AnalysisService {
} }
connectToStream(streamId: string) { connectToStream(streamId: string) {
this.websocketService.connect(streamId); this.streamStartTime = new Date();
this.websocketService.subscribe(streamId);
} }
disconnect() { disconnect() {
this.websocketService.unsubscribe();
this.websocketService.disconnect(); this.websocketService.disconnect();
this.currentDetections.next([]); this.currentDetections.next([]);
this.currentVisual.next(null); this.currentVisual.next(null);
this.streamStartTime = null;
} }
private handleAnalysisUpdate(analysis: Analysis) { private handleAnalysisUpdate(analysis: Analysis) {
console.log('Received analysis update:', analysis);
// Filter out analysis from before stream started (with 30 second buffer for recent analysis)
if (this.streamStartTime && analysis.timestamp) {
const analysisTime = new Date(analysis.timestamp);
const bufferTime = new Date(this.streamStartTime.getTime() - 30000); // 30 seconds before stream start
if (analysisTime < bufferTime) {
console.log('Ignoring old analysis from before stream start:', {
analysisTime: analysisTime.toISOString(),
streamStart: this.streamStartTime.toISOString()
});
return;
}
}
// Update recent analyses list // Update recent analyses list
const current = this.recentAnalyses.value; const current = this.recentAnalyses.value;
const updated = [analysis, ...current.slice(0, 9)]; // Keep last 10 const updated = [analysis, ...current.slice(0, 9)]; // Keep last 10

View File

@@ -1,9 +1,34 @@
import { Injectable } from '@angular/core'; import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { Stream } from '../models/stream';
import { environment } from '../../environments/environment';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
}) })
export class StreamService { export class StreamService {
private apiUrl = `${environment.apiUrl}/streaming`;
constructor() { } constructor(private http: HttpClient) { }
getStreams(): Observable<{streams: Stream[]}> {
return this.http.get<{streams: Stream[]}>(`${this.apiUrl}/streams/`);
}
createStream(streamData: any): Observable<Stream> {
return this.http.post<Stream>(`${this.apiUrl}/streams/create/`, streamData);
}
startWebcamStream(): Observable<Stream> {
return this.http.post<Stream>(`${this.apiUrl}/streams/webcam/start/`, {});
}
startStream(streamKey: string): Observable<{message: string, hls_playlist_url: string}> {
return this.http.post<{message: string, hls_playlist_url: string}>(`${this.apiUrl}/streams/${streamKey}/start/`, {});
}
stopStream(streamKey: string): Observable<{message: string}> {
return this.http.post<{message: string}>(`${this.apiUrl}/streams/${streamKey}/stop/`, {});
}
} }

View File

@@ -7,6 +7,7 @@ import { Analysis } from '../models/analysis';
}) })
export class WebsocketService { export class WebsocketService {
private socket?: WebSocket; private socket?: WebSocket;
private currentStreamId?: string;
private analysisSubject = new Subject<Analysis>(); private analysisSubject = new Subject<Analysis>();
private connectionStatus = new BehaviorSubject<boolean>(false); private connectionStatus = new BehaviorSubject<boolean>(false);
@@ -15,13 +16,13 @@ export class WebsocketService {
constructor() { } constructor() { }
connect(streamId: string) { connect() {
if (this.socket?.readyState === WebSocket.OPEN) { if (this.socket?.readyState === WebSocket.OPEN) {
return; return;
} }
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/stream/${streamId}/`; const wsUrl = `${protocol}//${window.location.host}/ws/stream/`;
console.log('Connecting to WebSocket:', wsUrl); console.log('Connecting to WebSocket:', wsUrl);
this.socket = new WebSocket(wsUrl); this.socket = new WebSocket(wsUrl);
@@ -63,6 +64,7 @@ export class WebsocketService {
if (this.socket) { if (this.socket) {
this.socket.close(); this.socket.close();
this.socket = undefined; this.socket = undefined;
this.currentStreamId = undefined;
this.connectionStatus.next(false); this.connectionStatus.next(false);
} }
} }
@@ -79,4 +81,17 @@ export class WebsocketService {
timestamp: Date.now() timestamp: Date.now()
}); });
} }
subscribe(streamId: string) {
this.currentStreamId = streamId;
this.connect();
this.send({ type: 'subscribe', stream_id: streamId });
}
unsubscribe() {
if (this.currentStreamId) {
this.send({ type: 'unsubscribe', stream_id: this.currentStreamId });
this.currentStreamId = undefined;
}
}
} }

View File

@@ -18,19 +18,20 @@ video structure
keyboards (early days) keyboards (early days)
music vs coding (the gap gets wider) music vs coding (the gap gets wider)
recurrent back to basics recurrent back to basics
behind the scenes
the setup
deskmeter
timelapses/ffmpeg
demo demo
phase 1 phase 1
phase 2 phase 2
phase 3 phase 3
extras extras
behind the scenes opinions
the setup
deskmeter
timelapses/ffmpeg
make your own path
bootcamps bootcamps
pimp-up-your-profile new trend pimp-up-your-profile new trend
for seenka show the current state of my use of AI tools
motivations
im not in it (just) for the money im not in it (just) for the money
video processing is my passion (? video processing is my passion (?