162 lines
5.8 KiB
Plaintext
162 lines
5.8 KiB
Plaintext
Two issues here:
|
||
|
||
1. Container isolation problem - File-watcher creates temp directory in its container, but
|
||
celery-logo can't access it (different containers)
|
||
2. Strategy logging spam - "Configured execution strategy: local" logs on every task
|
||
|
||
Let me fix both:
|
||
|
||
Fix 1: Use shared volume instead of container-local temp directory
|
||
|
||
● Update(media-analyzer/backend/streaming/file_watcher.py)
|
||
⎿ User rejected update to media-analyzer/backend/streaming/file_watcher.py
|
||
18 self.processed_files = set()
|
||
19 self.analyzer = VideoAnalyzer()
|
||
20
|
||
21 - # Create a persistent directory for analysis segments
|
||
21 + # Create a persistent directory for analysis segments in shared volume
|
||
22 self.analysis_dir = self.media_dir / 'analysis_segments'
|
||
23 try:
|
||
24 - self.analysis_dir.mkdir(exist_ok=True)
|
||
24 + self.analysis_dir.mkdir(exist_ok=True, mode=0o755)
|
||
25 + logger.info(f"File watcher: Using analysis directory: {self.analysis_dir}")
|
||
26 except PermissionError:
|
||
27 - logger.warning(f"File watcher: Cannot create {self.analysis_dir}, using temp directory")
|
||
27 + logger.warning(f"File watcher: Cannot create {self.analysis_dir}, using shared temp directory")
|
||
28 + # Use temp directory within the shared volume so celery can access it
|
||
29 import tempfile
|
||
30 - self.analysis_dir = Path(tempfile.mkdtemp(prefix='analysis_segments_'))
|
||
30 + self.analysis_dir = Path(tempfile.mkdtemp(dir=self.media_dir, prefix='analysis_'))
|
||
31 logger.info(f"File watcher: Using temporary analysis directory: {self.analysis_dir}")
|
||
32
|
||
33 # Track copied files for cleanup
|
||
|
||
> how this archicheture desing would scale when moving to k8s or hybrid deployments
|
||
|
||
● Excellent question! The current architecture has scaling challenges that need addressing for K8s/hybrid deployments:
|
||
|
||
Current Architecture Limitations
|
||
|
||
Single Point of Failure:
|
||
File-watcher (1 instance) → Temp Directory → Multiple Celery Workers
|
||
- Only one file-watcher instance can run
|
||
- Temp directory is container-local (doesn't work across pods)
|
||
- No coordination between file-watchers if scaled
|
||
|
||
K8s-Ready Architecture Design
|
||
|
||
Option 1: Event-Driven with Message Queues
|
||
|
||
RTMP/HLS → File Events → Redis Streams → Celery Tasks → Analysis Results
|
||
|
||
Implementation:
|
||
- File events go directly to Redis/RabbitMQ instead of filesystem watching
|
||
- Multiple file-watcher pods can process different streams
|
||
- Celery workers scale independently across nodes
|
||
- No shared filesystem dependency
|
||
|
||
Option 2: Object Storage + Event Notifications
|
||
|
||
RTMP → Cloud Storage → Storage Events → Lambda/K8s Jobs → Analysis
|
||
|
||
GCP Implementation:
|
||
# segments uploaded to GCS trigger Cloud Functions
|
||
# Cloud Functions push tasks to Cloud Tasks/Pub/Sub
|
||
# Celery workers pull from Pub/Sub instead of file watching
|
||
|
||
Option 3: StatefulSet + Persistent Volumes
|
||
|
||
File-watcher StatefulSet → PVC → Celery Deployment
|
||
|
||
Current Architecture Enhanced:
|
||
- File-watcher as StatefulSet with persistent volume
|
||
- Analysis segments on shared PVC (ReadWriteMany)
|
||
- Multiple celery pods access same PVC
|
||
|
||
Hybrid Deployment Patterns
|
||
|
||
Local Development
|
||
|
||
docker compose up # Current setup works fine
|
||
|
||
Single VM K8s (k3s/kind)
|
||
|
||
# Use hostPath volumes
|
||
volumes:
|
||
- name: media-storage
|
||
hostPath:
|
||
path: /opt/media-analyzer/segments
|
||
|
||
Cloud K8s (GKE/EKS)
|
||
|
||
# Use cloud storage + events
|
||
env:
|
||
- name: USE_CLOUD_STORAGE
|
||
value: "true"
|
||
- name: GCS_BUCKET
|
||
value: "media-analyzer-segments"
|
||
|
||
Recommended Migration Path
|
||
|
||
Phase 1: Make Current Architecture K8s-Compatible
|
||
|
||
# Add environment-aware file watching
|
||
if os.getenv('KUBERNETES_SERVICE_HOST'):
|
||
# Use shared PVC path
|
||
self.analysis_dir = Path('/shared-storage/analysis_segments')
|
||
else:
|
||
# Use local docker volume
|
||
self.analysis_dir = self.media_dir / 'analysis_segments'
|
||
|
||
Phase 2: Event-Driven Refactor
|
||
|
||
# Replace file system events with message events
|
||
class VideoSegmentProcessor:
|
||
def handle_new_segment(self, segment_event):
|
||
# segment_event contains: bucket, path, stream_key
|
||
# No file copying needed - process from object storage
|
||
|
||
Phase 3: Full Cloud-Native
|
||
|
||
# Kubernetes CronJob for cleanup instead of in-process cleanup
|
||
# Horizontal Pod Autoscaler based on queue depth
|
||
# Cloud storage for persistence
|
||
# Cloud events for coordination
|
||
|
||
Performance & Scaling Considerations
|
||
|
||
Current Bottlenecks:
|
||
- Single file-watcher instance
|
||
- File copying overhead
|
||
- Shared filesystem dependency
|
||
|
||
K8s Solutions:
|
||
- HPA: Scale celery workers based on Redis queue length
|
||
- Multiple file-watchers: One per stream or region
|
||
- Node affinity: Co-locate file-watcher with RTMP pods
|
||
- Resource quotas: Prevent memory leaks from affecting cluster
|
||
|
||
Configuration Strategy
|
||
|
||
# Environment-aware configuration
|
||
class ScalingConfig:
|
||
def __init__(self):
|
||
self.deployment_type = self.detect_environment()
|
||
|
||
def detect_environment(self):
|
||
if os.getenv('KUBERNETES_SERVICE_HOST'):
|
||
if os.getenv('GCP_PROJECT_ID'):
|
||
return 'gke'
|
||
return 'k8s'
|
||
return 'docker'
|
||
|
||
def get_storage_backend(self):
|
||
return {
|
||
'docker': LocalFileSystem(),
|
||
'k8s': SharedVolume(),
|
||
'gke': CloudStorage()
|
||
}[self.deployment_type]
|
||
|
||
The current architecture works well for Docker Compose but needs the event-driven refactor for true K8s scalability. The hybrid approach lets you migrate incrementally while maintaining backward
|
||
compatibility.
|