#!/usr/bin/env python3 """ Simulate a full pipeline run — pushes all event types in sequence. Usage: python tests/detect/manual/push_pipeline.py [--job JOB_ID] [--port PORT] [--delay SECS] Opens: http://mpr.local.ar/detection/?job= """ import argparse import json import time from datetime import datetime, timezone import redis def ts(): return datetime.now(timezone.utc).isoformat() def push(r, key, event): event["ts"] = event.get("ts", ts()) r.rpush(key, json.dumps(event)) etype = event["event"] detail = event.get("msg", event.get("stage", "")) print(f" [{etype:14s}] {detail}") return event def main(): parser = argparse.ArgumentParser() parser.add_argument("--job", default="pipeline-test") parser.add_argument("--port", type=int, default=6382) parser.add_argument("--delay", type=float, default=0.5) args = parser.parse_args() r = redis.Redis(port=args.port, decode_responses=True) key = f"detect_events:{args.job}" # Clear previous events for this job r.delete(key) print(f"Simulating pipeline run → {key}") print(f"Open: http://mpr.local.ar/detection/?job={args.job}") print() delay = args.delay # Stage 1: Frame extraction push(r, key, {"event": "log", "level": "INFO", "stage": "FrameExtractor", "msg": "Starting extraction: soccer_clip.mp4 (60.0s, 1920x1080, fps=2)"}) time.sleep(delay) push(r, key, {"event": "stats_update", "frames_extracted": 120, "frames_after_scene_filter": 0, "regions_detected": 0, "regions_resolved_by_ocr": 0, "regions_escalated_to_local_vlm": 0, "regions_escalated_to_cloud_llm": 0, "cloud_llm_calls": 0, "processing_time_seconds": 3.2, "estimated_cloud_cost_usd": 0}) time.sleep(delay) push(r, key, {"event": "log", "level": "INFO", "stage": "FrameExtractor", "msg": "Extracted 120 frames"}) time.sleep(delay) # Stage 2: Scene filter push(r, key, {"event": "log", "level": "INFO", "stage": "SceneFilter", "msg": "Filtering duplicates (hamming_threshold=8)"}) time.sleep(delay) push(r, key, {"event": "stats_update", "frames_extracted": 120, "frames_after_scene_filter": 45, "regions_detected": 0, "regions_resolved_by_ocr": 0, "regions_escalated_to_local_vlm": 0, "regions_escalated_to_cloud_llm": 0, "cloud_llm_calls": 0, "processing_time_seconds": 5.1, "estimated_cloud_cost_usd": 0}) time.sleep(delay) push(r, key, {"event": "log", "level": "INFO", "stage": "SceneFilter", "msg": "Kept 45 frames (62.5% reduction)"}) time.sleep(delay) # Stage 3: YOLO detection push(r, key, {"event": "log", "level": "INFO", "stage": "YOLODetector", "msg": "Loading yolov8n.pt (fp16, 1.2GB VRAM)"}) time.sleep(delay) for batch in range(1, 4): push(r, key, {"event": "log", "level": "DEBUG", "stage": "YOLODetector", "msg": f"Processing batch {batch}/3 (15 frames)"}) time.sleep(delay * 0.5) push(r, key, {"event": "stats_update", "frames_extracted": 120, "frames_after_scene_filter": 45, "regions_detected": 32, "regions_resolved_by_ocr": 0, "regions_escalated_to_local_vlm": 0, "regions_escalated_to_cloud_llm": 0, "cloud_llm_calls": 0, "processing_time_seconds": 12.4, "estimated_cloud_cost_usd": 0}) time.sleep(delay) # Stage 4: OCR push(r, key, {"event": "log", "level": "INFO", "stage": "OCRStage", "msg": "Running PaddleOCR on 32 regions"}) time.sleep(delay) push(r, key, {"event": "stats_update", "frames_extracted": 120, "frames_after_scene_filter": 45, "regions_detected": 32, "regions_resolved_by_ocr": 24, "regions_escalated_to_local_vlm": 0, "regions_escalated_to_cloud_llm": 0, "cloud_llm_calls": 0, "processing_time_seconds": 18.7, "estimated_cloud_cost_usd": 0}) time.sleep(delay) # Stage 5: Brand resolver push(r, key, {"event": "log", "level": "INFO", "stage": "BrandResolver", "msg": "Matched 20 exact, 4 fuzzy. 8 unresolved → VLM"}) time.sleep(delay) # Emit some detections for brand, conf in [("Nike", 0.95), ("Emirates", 0.91), ("Adidas", 0.88), ("Coca-Cola", 0.82)]: push(r, key, {"event": "detection", "brand": brand, "confidence": conf, "source": "ocr", "timestamp": 12.5, "duration": 0.5, "content_type": "soccer_broadcast", "frame_ref": 25}) time.sleep(delay * 0.3) # Stage 6: VLM escalation push(r, key, {"event": "log", "level": "INFO", "stage": "VLMLocal", "msg": "Processing 8 unresolved crops with moondream2"}) time.sleep(delay) push(r, key, {"event": "log", "level": "WARNING", "stage": "VLMLocal", "msg": "Low confidence on 2 crops, escalating to cloud LLM"}) time.sleep(delay) push(r, key, {"event": "stats_update", "frames_extracted": 120, "frames_after_scene_filter": 45, "regions_detected": 32, "regions_resolved_by_ocr": 24, "regions_escalated_to_local_vlm": 8, "regions_escalated_to_cloud_llm": 2, "cloud_llm_calls": 2, "processing_time_seconds": 28.3, "estimated_cloud_cost_usd": 0.0042}) time.sleep(delay) # More detections from VLM push(r, key, {"event": "detection", "brand": "Mastercard", "confidence": 0.76, "source": "local_vlm", "timestamp": 34.0, "duration": 1.0, "content_type": "soccer_broadcast", "frame_ref": 68}) time.sleep(delay * 0.3) push(r, key, {"event": "detection", "brand": "Heineken", "confidence": 0.71, "source": "cloud_llm", "timestamp": 45.5, "duration": 0.5, "content_type": "soccer_broadcast", "frame_ref": 91}) time.sleep(delay * 0.3) # Final push(r, key, {"event": "log", "level": "INFO", "stage": "Aggregator", "msg": "Report complete: 6 brands, 26 total appearances"}) time.sleep(delay) push(r, key, {"event": "job_complete", "job_id": args.job, "report": { "video_source": "soccer_clip.mp4", "content_type": "soccer_broadcast", "duration_seconds": 60.0, "brands": { "Nike": {"total_appearances": 8, "total_screen_time": 4.0, "avg_confidence": 0.93}, "Emirates": {"total_appearances": 6, "total_screen_time": 3.0, "avg_confidence": 0.89}, "Adidas": {"total_appearances": 5, "total_screen_time": 2.5, "avg_confidence": 0.85}, "Coca-Cola": {"total_appearances": 4, "total_screen_time": 2.0, "avg_confidence": 0.80}, "Mastercard": {"total_appearances": 2, "total_screen_time": 1.0, "avg_confidence": 0.76}, "Heineken": {"total_appearances": 1, "total_screen_time": 0.5, "avg_confidence": 0.71}, }, }}) print(f"\nPipeline simulation complete.") if __name__ == "__main__": main()