diff --git a/.gitignore b/.gitignore index b12d928..443f1dd 100644 --- a/.gitignore +++ b/.gitignore @@ -17,10 +17,8 @@ env/ *.pot *.pyc db.sqlite3 -media/in/* -!media/in/.gitkeep -media/out/* -!media/out/.gitkeep +media/* +!media/.gitkeep # Node node_modules/ @@ -39,3 +37,4 @@ Thumbs.db # Project specific def/ +ctrl/k8s/overlays/dev/local-config.yaml diff --git a/core/api/detect_sources.py b/core/api/detect_sources.py index 0ee97e5..697f0be 100644 --- a/core/api/detect_sources.py +++ b/core/api/detect_sources.py @@ -168,7 +168,7 @@ def run_pipeline(req: RunRequest): from detect.state import DetectState local_path = _resolve_video_path(req.video_path) - job_id = str(uuid.uuid4())[:8] + job_id = str(uuid.uuid4()) if req.skip_vlm: os.environ["SKIP_VLM"] = "1" @@ -200,8 +200,6 @@ def run_pipeline(req: RunRequest): source_asset_id=req.source_asset_id, ) - import traceback - from detect.graph import PipelineCancelled, set_cancel_check, clear_cancel_check set_cancel_check(job_id, lambda: job_id in _cancelled_jobs) @@ -218,9 +216,17 @@ def run_pipeline(req: RunRequest): emit.job_complete(job_id, {"status": "cancelled"}) except Exception as e: logger.exception("Pipeline run %s failed: %s", job_id, e) - tb = traceback.format_exc() + # Mark the current/last stage as error in the graph + from detect.graph import _node_states, NODES + if job_id in _node_states: + states = _node_states[job_id] + for node in reversed(NODES): + if states.get(node) in ("running", "done"): + states[node] = "error" + break + nodes = [{"id": n, "status": states[n]} for n in NODES] + emit.graph_update(job_id, nodes) emit.log(job_id, "Pipeline", "ERROR", str(e)) - emit.log(job_id, "Pipeline", "DEBUG", tb) emit.job_complete(job_id, {"status": "failed", "error": str(e)}) finally: _running_jobs.pop(job_id, None) diff --git a/core/api/detect_sse.py b/core/api/detect_sse.py index 309c7ce..41064f6 100644 --- a/core/api/detect_sse.py +++ b/core/api/detect_sse.py @@ -26,16 +26,16 @@ router = APIRouter(prefix="/detect", tags=["detect"]) async def _event_generator(job_id: str) -> AsyncGenerator[str, None]: cursor = 0 - timeout = time.monotonic() + 3600 # 1 hour max (detection jobs are long) + timeout = time.monotonic() + 3600 # 1 hour max while time.monotonic() < timeout: events, cursor = poll_events(job_id, cursor, prefix=DETECT_EVENTS_PREFIX) if not events: - yield f"event: waiting\ndata: {json.dumps({'job_id': job_id})}\n\n" - await asyncio.sleep(0.1) + await asyncio.sleep(0.2) continue + is_terminal = False for data in events: event_type = data.pop("event", "update") payload = {**data, "job_id": job_id} @@ -43,8 +43,15 @@ async def _event_generator(job_id: str) -> AsyncGenerator[str, None]: yield f"event: {event_type}\ndata: {json.dumps(payload)}\n\n" if event_type in TERMINAL_EVENTS: - yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n" - return + is_terminal = True + + if is_terminal: + yield f"event: done\ndata: {json.dumps({'job_id': job_id})}\n\n" + # Don't return — keep connection alive so EventSource doesn't reconnect. + # Just idle until the client disconnects or timeout. + while time.monotonic() < timeout: + await asyncio.sleep(5) + return await asyncio.sleep(0.05) diff --git a/ctrl/Tiltfile b/ctrl/Tiltfile index 9562761..b9511c3 100644 --- a/ctrl/Tiltfile +++ b/ctrl/Tiltfile @@ -46,6 +46,7 @@ k8s_resource('gateway', resource_deps=['fastapi', 'detection-ui'], # Group uncategorized resources (configmaps, namespace) under infra k8s_resource( objects=['mpr:namespace', 'mpr-config:configmap', 'minio-config:configmap', - 'postgres-config:configmap', 'envoy-gateway-config:configmap'], + 'postgres-config:configmap', 'envoy-gateway-config:configmap', + 'minio-data:persistentvolumeclaim'], new_name='infra', ) diff --git a/ctrl/k8s/base/configmap.yaml b/ctrl/k8s/base/configmap.yaml index 08b6304..30b5f7f 100644 --- a/ctrl/k8s/base/configmap.yaml +++ b/ctrl/k8s/base/configmap.yaml @@ -8,4 +8,5 @@ data: DEBUG: "1" FASTAPI_PORT: "8702" DETECTION_UI_PORT: "5175" + INFERENCE_URL: "" GATEWAY_PORT: "8080" diff --git a/ctrl/k8s/kind-config.yaml.tpl b/ctrl/k8s/kind-config.yaml.tpl index 3b45655..6eaf74c 100644 --- a/ctrl/k8s/kind-config.yaml.tpl +++ b/ctrl/k8s/kind-config.yaml.tpl @@ -7,6 +7,14 @@ nodes: - containerPort: 30080 hostPort: 80 protocol: TCP + - containerPort: 30379 + hostPort: 6379 + listenAddress: "0.0.0.0" + protocol: TCP + - containerPort: 30432 + hostPort: 5432 + listenAddress: "0.0.0.0" + protocol: TCP extraMounts: - hostPath: ${MEDIA_HOST_PATH} containerPath: /mnt/media diff --git a/ctrl/k8s/overlays/dev/kustomization.yaml b/ctrl/k8s/overlays/dev/kustomization.yaml index f35722f..d87b72d 100644 --- a/ctrl/k8s/overlays/dev/kustomization.yaml +++ b/ctrl/k8s/overlays/dev/kustomization.yaml @@ -5,6 +5,9 @@ resources: - ../../base - minio-pvc.yaml +patchesStrategicMerge: + - local-config.yaml + patches: # Gateway as NodePort for local access - target: @@ -30,6 +33,18 @@ patches: path: /spec/ports/0/nodePort value: 30379 + # Postgres as NodePort for external access + - target: + kind: Service + name: postgres + patch: | + - op: replace + path: /spec/type + value: NodePort + - op: add + path: /spec/ports/0/nodePort + value: 30432 + # MinIO with persistent storage + host media mount for seeding. # PV survives pod restarts. Host mount is read-only for mc mirror seeding. # Requires kind cluster created with MEDIA_HOST_PATH extraMount (see kind-create.sh). diff --git a/detect/emit.py b/detect/emit.py index 8008085..6b0b0af 100644 --- a/detect/emit.py +++ b/detect/emit.py @@ -6,6 +6,9 @@ Stages call these instead of constructing dicts or dataclasses directly. Run context (run_id, parent_job_id) is set once at pipeline start via set_run_context() and automatically injected into all events. + +Log level is set per-run with optional per-stage overrides. +DEBUG events are only pushed when the run (or stage) log level allows it. """ from __future__ import annotations @@ -16,23 +19,53 @@ from datetime import datetime, timezone from detect.events import push_detect_event from detect.models import PipelineStats +# Log level ordering for comparison +_LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3} + # Module-level run context — set once per pipeline invocation _run_context: dict = {} +_run_log_level: str = "INFO" +_stage_log_levels: dict[str, str] = {} # stage_name → level override -def set_run_context(run_id: str = "", parent_job_id: str = "", run_type: str = "initial"): +def set_run_context( + run_id: str = "", + parent_job_id: str = "", + run_type: str = "initial", + log_level: str = "INFO", +): """Set the run context for all subsequent events in this pipeline invocation.""" - global _run_context + global _run_context, _run_log_level _run_context = { "run_id": run_id, "parent_job_id": parent_job_id, "run_type": run_type, } + _run_log_level = log_level.upper() + _stage_log_levels.clear() + + +def set_stage_log_level(stage: str, level: str): + """Override log level for a specific stage.""" + _stage_log_levels[stage] = level.upper() + + +def clear_stage_log_level(stage: str): + """Remove per-stage log level override.""" + _stage_log_levels.pop(stage, None) def clear_run_context(): - global _run_context + global _run_context, _run_log_level _run_context = {} + _run_log_level = "INFO" + _stage_log_levels.clear() + + +def _should_emit(level: str, stage: str) -> bool: + """Check if this log level should be emitted given run/stage settings.""" + effective = _stage_log_levels.get(stage, _run_log_level) + return _LEVEL_ORDER.get(level.upper(), 1) >= _LEVEL_ORDER.get(effective, 1) def _inject_context(payload: dict) -> dict: @@ -45,6 +78,8 @@ def _inject_context(payload: dict) -> dict: def log(job_id: str | None, stage: str, level: str, msg: str) -> None: if not job_id: return + if not _should_emit(level, stage): + return payload = { "level": level, "stage": stage, diff --git a/detect/graph.py b/detect/graph.py index f6017ed..cac13d8 100644 --- a/detect/graph.py +++ b/detect/graph.py @@ -236,7 +236,8 @@ def node_escalate_vlm(state: DetectState) -> dict: existing = state.get("detections", []) - _emit_transition(state, "escalate_vlm", "done") + vlm_skipped = os.environ.get("SKIP_VLM", "").strip() == "1" + _emit_transition(state, "escalate_vlm", "skipped" if vlm_skipped else "done") return { "detections": existing + vlm_matched, "unresolved_candidates": still_unresolved, @@ -268,7 +269,8 @@ def node_escalate_cloud(state: DetectState) -> dict: existing = state.get("detections", []) - _emit_transition(state, "escalate_cloud", "done") + cloud_skipped = os.environ.get("SKIP_CLOUD", "").strip() == "1" + _emit_transition(state, "escalate_cloud", "skipped" if cloud_skipped else "done") return {"detections": existing + cloud_matched, "stats": stats} @@ -302,11 +304,33 @@ _CHECKPOINT_ENABLED = os.environ.get("MPR_CHECKPOINT", "").strip() == "1" _frames_manifest: dict[str, dict[int, str]] = {} # job_id → manifest (cached per job) +class PipelineCancelled(Exception): + """Raised when a pipeline run is cancelled.""" + pass + + +# Cancellation hook — set by the run endpoint, checked before each node +_cancel_check: dict[str, callable] = {} + + +def set_cancel_check(job_id: str, fn): + _cancel_check[job_id] = fn + + +def clear_cancel_check(job_id: str): + _cancel_check.pop(job_id, None) + + def _checkpointing_node(node_name: str, node_fn): """Wrap a node function to auto-checkpoint after completion.""" stage_index = NODES.index(node_name) def wrapper(state: DetectState) -> dict: + job_id = state.get("job_id", "") + check = _cancel_check.get(job_id) + if check and check(): + raise PipelineCancelled(f"Cancelled before {node_name}") + result = node_fn(state) job_id = state.get("job_id", "") diff --git a/detect/inference/client.py b/detect/inference/client.py index 92db79c..f75db73 100644 --- a/detect/inference/client.py +++ b/detect/inference/client.py @@ -34,10 +34,16 @@ def _encode_image(image: np.ndarray) -> str: class InferenceClient: """HTTP client for the GPU inference server.""" - def __init__(self, base_url: str | None = None, timeout: float = 60.0): + def __init__(self, base_url: str | None = None, timeout: float = 60.0, + job_id: str = "", log_level: str = "INFO"): self.base_url = (base_url or DEFAULT_URL).rstrip("/") self.timeout = timeout + self.job_id = job_id + self.log_level = log_level self.session = requests.Session() + if job_id: + self.session.headers["X-Job-Id"] = job_id + self.session.headers["X-Log-Level"] = log_level def health(self) -> ServerStatus: """Check server health and loaded models.""" diff --git a/detect/stages/frame_extractor.py b/detect/stages/frame_extractor.py index 9ccf2f4..c4e5d4b 100644 --- a/detect/stages/frame_extractor.py +++ b/detect/stages/frame_extractor.py @@ -8,6 +8,7 @@ Emits log + stats_update SSE events as it works. from __future__ import annotations import tempfile +import time from pathlib import Path import ffmpeg @@ -53,6 +54,8 @@ def extract_frames( emit.log(job_id, "FrameExtractor", "INFO", f"Starting extraction: {Path(video_path).name} " f"({duration:.1f}s, {probe.width}x{probe.height}, fps={config.fps})") + emit.log(job_id, "FrameExtractor", "DEBUG", + f"Probe: codec={probe.video_codec}, bitrate={probe.video_bitrate}, max_frames={config.max_frames}") with tempfile.TemporaryDirectory() as tmpdir: pattern = str(Path(tmpdir) / "frame_%06d.jpg") @@ -65,14 +68,24 @@ def extract_frames( .overwrite_output() ) + t0 = time.monotonic() try: stream.run(capture_stdout=True, capture_stderr=True, quiet=True) except ffmpeg.Error as e: stderr = e.stderr.decode() if e.stderr else "unknown error" emit.log(job_id, "FrameExtractor", "ERROR", f"FFmpeg failed: {stderr[:200]}") raise RuntimeError(f"FFmpeg failed: {stderr}") from e + ffmpeg_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "FrameExtractor", "DEBUG", f"FFmpeg decode: {ffmpeg_ms:.0f}ms") + t0 = time.monotonic() frames = _load_frames(Path(tmpdir), config.fps) + load_ms = (time.monotonic() - t0) * 1000 + if frames: + h, w = frames[0].image.shape[:2] + mem_mb = sum(f.image.nbytes for f in frames) / (1024 * 1024) + emit.log(job_id, "FrameExtractor", "DEBUG", + f"Loaded {len(frames)} frames ({w}x{h}) in {load_ms:.0f}ms, {mem_mb:.1f}MB in memory") emit.log(job_id, "FrameExtractor", "INFO", f"Extracted {len(frames)} frames") emit.stats(job_id, frames_extracted=len(frames)) diff --git a/detect/stages/ocr_stage.py b/detect/stages/ocr_stage.py index 4a7484c..608e038 100644 --- a/detect/stages/ocr_stage.py +++ b/detect/stages/ocr_stage.py @@ -13,6 +13,7 @@ Model instances are cached at module level so they survive across pipeline runs. from __future__ import annotations import logging +import time from typing import TYPE_CHECKING import numpy as np @@ -91,7 +92,8 @@ def run_ocr( # Build these once per pipeline run, not per crop if inference_url: from detect.inference import InferenceClient - client = InferenceClient(base_url=inference_url) + from detect.emit import _run_log_level + client = InferenceClient(base_url=inference_url, job_id=job_id or "", log_level=_run_log_level) else: model = _get_local_model(config.languages[0]) @@ -108,12 +110,19 @@ def run_ocr( if crop.size == 0: continue + t0 = time.monotonic() if inference_url: raw_results = client.ocr(image=crop, languages=config.languages) texts = [{"text": r.text, "confidence": r.confidence} for r in raw_results] else: raw = model.ocr(crop) texts = _parse_ocr_raw(raw, config.min_confidence) + ocr_ms = (time.monotonic() - t0) * 1000 + + h, w = crop.shape[:2] + text_preview = ", ".join(t["text"][:30] for t in texts) if texts else "(none)" + emit.log(job_id, "OCRStage", "DEBUG", + f"Frame {seq} box {box.x},{box.y} ({w}x{h}): {ocr_ms:.0f}ms → {text_preview}") for t in texts: candidates.append(TextCandidate( diff --git a/detect/stages/scene_filter.py b/detect/stages/scene_filter.py index 1eda72e..cdeb197 100644 --- a/detect/stages/scene_filter.py +++ b/detect/stages/scene_filter.py @@ -9,6 +9,8 @@ CV stages without losing unique visual content. from __future__ import annotations +import time + import imagehash from PIL import Image @@ -63,8 +65,16 @@ def scene_filter( emit.log(job_id, "SceneFilter", "INFO", f"Filtering {len(frames)} frames (hamming_threshold={config.hamming_threshold})") + t0 = time.monotonic() hashes = _compute_hashes(frames) + hash_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "SceneFilter", "DEBUG", + f"Computed {len(hashes)} perceptual hashes in {hash_ms:.0f}ms ({hash_ms/max(len(hashes),1):.1f}ms/frame)") + + t0 = time.monotonic() kept = _dedup(frames, hashes, config.hamming_threshold) + dedup_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "SceneFilter", "DEBUG", f"Dedup pass: {dedup_ms:.0f}ms") dropped = len(frames) - len(kept) pct = (dropped / len(frames) * 100) if frames else 0 diff --git a/detect/stages/vlm_cloud.py b/detect/stages/vlm_cloud.py index 8fc912b..df721e4 100644 --- a/detect/stages/vlm_cloud.py +++ b/detect/stages/vlm_cloud.py @@ -13,6 +13,8 @@ from __future__ import annotations import base64 import io import logging +import os +import time import numpy as np from PIL import Image @@ -108,6 +110,11 @@ def escalate_cloud( if not candidates: return [] + if os.environ.get("SKIP_CLOUD", "").strip() == "1": + emit.log(job_id, "CloudLLM", "INFO", + f"SKIP_CLOUD=1, skipping {len(candidates)} crops") + return [] + if not has_api_key(): emit.log(job_id, "CloudLLM", "WARNING", f"No API key set for cloud provider, skipping {len(candidates)} crops") @@ -120,7 +127,7 @@ def escalate_cloud( matched: list[BrandDetection] = [] total_cost = 0.0 - for candidate in candidates: + for i, candidate in enumerate(candidates): crop = _crop_image(candidate) if crop.size == 0: continue @@ -133,11 +140,15 @@ def escalate_cloud( prompt = vlm_prompt_fn(crop_context) image_b64 = _encode_crop(crop) + t0 = time.monotonic() try: result = _call_cloud_api(image_b64, prompt) except Exception as e: - logger.warning("Cloud LLM failed for '%s': %s", candidate.text, e) + call_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "CloudLLM", "DEBUG", + f"[{i+1}/{len(candidates)}] FAILED '{candidate.text[:30]}': {e} ({call_ms:.0f}ms)") continue + call_ms = (time.monotonic() - t0) * 1000 stats.cloud_llm_calls += 1 model_info = provider.models.get(provider.model) @@ -148,6 +159,11 @@ def escalate_cloud( brand = result["brand"] confidence = result["confidence"] + emit.log(job_id, "CloudLLM", "DEBUG", + f"[{i+1}/{len(candidates)}] '{candidate.text[:30]}' → " + f"{'✓ ' + brand if brand else '✗'} " + f"(conf={confidence:.2f}, {result['tokens']}tok, ${call_cost:.4f}, {call_ms:.0f}ms)") + if brand and confidence >= min_confidence: detection = BrandDetection( brand=brand, diff --git a/detect/stages/vlm_local.py b/detect/stages/vlm_local.py index af06107..2e8e1aa 100644 --- a/detect/stages/vlm_local.py +++ b/detect/stages/vlm_local.py @@ -9,6 +9,8 @@ objects for crops the VLM can identify. from __future__ import annotations import logging +import os +import time import numpy as np @@ -61,6 +63,11 @@ def escalate_vlm( if not candidates: return [], [] + if os.environ.get("SKIP_VLM", "").strip() == "1": + emit.log(job_id, "VLMLocal", "INFO", + f"SKIP_VLM=1, skipping {len(candidates)} crops") + return [], candidates + emit.log(job_id, "VLMLocal", "INFO", f"Processing {len(candidates)} unresolved crops with moondream2") @@ -69,9 +76,10 @@ def escalate_vlm( if inference_url: from detect.inference import InferenceClient - client = InferenceClient(base_url=inference_url) + from detect.emit import _run_log_level + client = InferenceClient(base_url=inference_url, job_id=job_id or "", log_level=_run_log_level) - for candidate in candidates: + for i, candidate in enumerate(candidates): crop = _crop_image(candidate) if crop.size == 0: still_unresolved.append(candidate) @@ -84,6 +92,7 @@ def escalate_vlm( ) prompt = vlm_prompt_fn(crop_context) + t0 = time.monotonic() try: if inference_url: result = client.vlm(image=crop, prompt=prompt) @@ -93,9 +102,16 @@ def escalate_vlm( else: brand, confidence, reasoning = _vlm_local(crop, prompt) except Exception as e: - logger.warning("VLM failed for candidate '%s': %s", candidate.text, e) + vlm_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "VLMLocal", "DEBUG", + f"[{i+1}/{len(candidates)}] FAILED '{candidate.text[:30]}': {e} ({vlm_ms:.0f}ms)") still_unresolved.append(candidate) continue + vlm_ms = (time.monotonic() - t0) * 1000 + emit.log(job_id, "VLMLocal", "DEBUG", + f"[{i+1}/{len(candidates)}] '{candidate.text[:30]}' → " + f"{'✓ ' + brand if brand else '✗ unresolved'} " + f"(conf={confidence:.2f}, {vlm_ms:.0f}ms)") if brand and confidence >= min_confidence: detection = BrandDetection( diff --git a/detect/stages/yolo_detector.py b/detect/stages/yolo_detector.py index de3a5c1..bdb8902 100644 --- a/detect/stages/yolo_detector.py +++ b/detect/stages/yolo_detector.py @@ -14,6 +14,7 @@ from __future__ import annotations import base64 import io import logging +import time from PIL import Image @@ -32,10 +33,11 @@ def _frame_to_b64(frame: Frame) -> str: return base64.b64encode(buf.getvalue()).decode() -def _detect_remote(frame: Frame, config: DetectionConfig, inference_url: str) -> list[BoundingBox]: +def _detect_remote(frame: Frame, config: DetectionConfig, inference_url: str, + job_id: str = "", log_level: str = "INFO") -> list[BoundingBox]: """Call the inference server over HTTP.""" from detect.inference import InferenceClient - client = InferenceClient(base_url=inference_url) + client = InferenceClient(base_url=inference_url, job_id=job_id, log_level=log_level) results = client.detect( image=frame.image, model=config.model_name, @@ -99,15 +101,24 @@ def detect_objects( all_boxes: dict[int, list[BoundingBox]] = {} total_regions = 0 - for frame in frames: + for i, frame in enumerate(frames): + t0 = time.monotonic() if inference_url: - boxes = _detect_remote(frame, config, inference_url) + from detect.emit import _run_log_level + boxes = _detect_remote(frame, config, inference_url, + job_id=job_id or "", log_level=_run_log_level) else: boxes = _detect_local(frame, config) + det_ms = (time.monotonic() - t0) * 1000 all_boxes[frame.sequence] = boxes total_regions += len(boxes) + emit.log(job_id, "YOLODetector", "DEBUG", + f"Frame {frame.sequence}: {len(boxes)} regions in {det_ms:.0f}ms" + f" [{', '.join(b.label for b in boxes)}]" if boxes else + f"Frame {frame.sequence}: 0 regions in {det_ms:.0f}ms") + if boxes and job_id: box_dicts = [{"x": b.x, "y": b.y, "w": b.w, "h": b.h, "confidence": b.confidence, "label": b.label} diff --git a/gpu/emit.py b/gpu/emit.py new file mode 100644 index 0000000..de4d296 --- /dev/null +++ b/gpu/emit.py @@ -0,0 +1,52 @@ +""" +Lightweight event emitter for the GPU inference server. + +Pushes debug logs to the same Redis stream as the pipeline orchestrator, +so GPU-side details (model load, VRAM, inference timing) appear in the +same log panel. + +Only active when the request includes X-Job-Id header. +No dependency on the detect package. +""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone + +import redis + +REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0") +EVENTS_PREFIX = "detect_events" + +_LEVEL_ORDER = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3} + +_redis_client = None + + +def _get_redis(): + global _redis_client + if _redis_client is None: + _redis_client = redis.from_url(REDIS_URL, decode_responses=True) + return _redis_client + + +def log(job_id: str, stage: str, level: str, msg: str, log_level: str = "INFO"): + """Push a log event to Redis if the level meets the threshold.""" + if not job_id: + return + if _LEVEL_ORDER.get(level.upper(), 1) < _LEVEL_ORDER.get(log_level.upper(), 1): + return + + r = _get_redis() + key = f"{EVENTS_PREFIX}:{job_id}" + event = json.dumps({ + "event": "log", + "level": level, + "stage": stage, + "msg": msg, + "ts": datetime.now(timezone.utc).isoformat(), + }) + r.rpush(key, event) + r.expire(key, 3600) diff --git a/gpu/requirements.txt b/gpu/requirements.txt index 0fc2427..7846841 100644 --- a/gpu/requirements.txt +++ b/gpu/requirements.txt @@ -2,6 +2,7 @@ fastapi>=0.109.0 uvicorn[standard]>=0.27.0 rapidfuzz>=3.0.0 Pillow>=10.0.0 +redis>=5.0.0 # --- GPU-specific installs (mcrn: RTX 3080, CUDA toolkit 12.8) --- # diff --git a/gpu/server.py b/gpu/server.py index c23df6c..5c22138 100644 --- a/gpu/server.py +++ b/gpu/server.py @@ -14,13 +14,16 @@ import base64 import io import logging import os +import time from contextlib import asynccontextmanager import numpy as np -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request from PIL import Image from pydantic import BaseModel +from emit import log as emit_log + from config import get_config, get_device, update_config from models import registry from models.yolo import detect as yolo_detect @@ -36,6 +39,19 @@ def _decode_image(b64: str) -> np.ndarray: return np.array(img) +def _job_ctx(request: Request) -> tuple[str, str]: + """Extract job_id and log_level from request headers.""" + job_id = request.headers.get("x-job-id", "") + log_level = request.headers.get("x-log-level", "INFO") + return job_id, log_level + + +def _gpu_log(job_id: str, log_level: str, stage: str, level: str, msg: str): + """Emit a log event if job context is present.""" + if job_id: + emit_log(job_id, stage, level, msg, log_level=log_level) + + # --- Request/Response models --- class DetectRequest(BaseModel): @@ -160,19 +176,31 @@ def unload_model(body: dict): @app.post("/detect", response_model=DetectResponse) -def detect(req: DetectRequest): +def detect(req: DetectRequest, request: Request): + job_id, log_level = _job_ctx(request) + try: + t0 = time.monotonic() image = _decode_image(req.image) + decode_ms = (time.monotonic() - t0) * 1000 + h, w = image.shape[:2] + _gpu_log(job_id, log_level, "GPU:YOLO", "DEBUG", + f"Decoded {w}x{h} image in {decode_ms:.0f}ms") except Exception as e: raise HTTPException(status_code=400, detail=f"Bad image: {e}") try: + t0 = time.monotonic() results = yolo_detect( image, model_name=req.model, confidence=req.confidence, target_classes=req.target_classes, ) + infer_ms = (time.monotonic() - t0) * 1000 + _gpu_log(job_id, log_level, "GPU:YOLO", "DEBUG", + f"Inference: {len(results)} detections in {infer_ms:.0f}ms " + f"(model={req.model}, conf={req.confidence})") except Exception as e: raise HTTPException(status_code=500, detail=f"Detection failed: {e}") @@ -180,14 +208,22 @@ def detect(req: DetectRequest): @app.post("/ocr", response_model=OCRResponse) -def ocr(req: OCRRequest): +def ocr(req: OCRRequest, request: Request): + job_id, log_level = _job_ctx(request) + try: image = _decode_image(req.image) + h, w = image.shape[:2] except Exception as e: raise HTTPException(status_code=400, detail=f"Bad image: {e}") try: + t0 = time.monotonic() results = ocr_run(image, languages=req.languages) + infer_ms = (time.monotonic() - t0) * 1000 + texts = [r["text"][:20] for r in results] + _gpu_log(job_id, log_level, "GPU:OCR", "DEBUG", + f"OCR {w}x{h}: {infer_ms:.0f}ms → {len(results)} results {texts}") except Exception as e: raise HTTPException(status_code=500, detail=f"OCR failed: {e}") @@ -223,14 +259,22 @@ def preprocess_image(req: PreprocessRequest): @app.post("/vlm", response_model=VLMResponse) -def vlm(req: VLMRequest): +def vlm(req: VLMRequest, request: Request): + job_id, log_level = _job_ctx(request) + try: image = _decode_image(req.image) + h, w = image.shape[:2] except Exception as e: raise HTTPException(status_code=400, detail=f"Bad image: {e}") try: + t0 = time.monotonic() result = vlm_query(image, req.prompt) + infer_ms = (time.monotonic() - t0) * 1000 + _gpu_log(job_id, log_level, "GPU:VLM", "DEBUG", + f"VLM {w}x{h}: {infer_ms:.0f}ms → " + f"brand='{result.get('brand', '')}' conf={result.get('confidence', 0):.2f}") except Exception as e: raise HTTPException(status_code=500, detail=f"VLM failed: {e}") diff --git a/ui/common/types/generated.ts b/ui/common/types/generated.ts index 722d02e..a70cf6b 100644 --- a/ui/common/types/generated.ts +++ b/ui/common/types/generated.ts @@ -170,19 +170,6 @@ export interface SourceBrandSighting { created_at: string | null; } -export interface SourceJob { - job_id: string; - source_type: string; - chunk_count: number; - total_bytes: number; -} - -export interface ChunkInfo { - filename: string; - key: string; - size_bytes: number; -} - export interface CreateJobRequest { source_asset_id: string; preset_id: string | null; @@ -220,6 +207,19 @@ export interface WorkerStatus { gpu_available: boolean; } +export interface SourceJob { + job_id: string; + source_type: string; + chunk_count: number; + total_bytes: number; +} + +export interface ChunkInfo { + filename: string; + key: string; + size_bytes: number; +} + export interface ChunkEvent { sequence: number; status: string; diff --git a/ui/detection-app/src/App.vue b/ui/detection-app/src/App.vue index 09cb3aa..793a144 100644 --- a/ui/detection-app/src/App.vue +++ b/ui/detection-app/src/App.vue @@ -9,19 +9,27 @@ import FramePanel from './panels/FramePanel.vue' import BrandTablePanel from './panels/BrandTablePanel.vue' import TimelinePanel from './panels/TimelinePanel.vue' import CostStatsPanel from './panels/CostStatsPanel.vue' +import SourceSelector from './panels/SourceSelector.vue' import type { StatsUpdate, RunContext } from './types/sse-contract' import { usePipelineStore } from './stores/pipeline' const pipeline = usePipelineStore() -const jobId = ref(new URLSearchParams(window.location.search).get('job') || 'test-job') +const jobParam = new URLSearchParams(window.location.search).get('job') +const jobId = ref(jobParam || '') const stats = ref(null) const runContext = ref(null) const status = ref<'idle' | 'live' | 'processing' | 'error'>('idle') +const logPanel = ref<{ clear: () => void } | null>(null) + +// No job selected → open source selector +if (!jobParam) { + pipeline.openSourceSelector() +} const source = new SSEDataSource({ id: 'detect-stream', - url: `/api/detect/stream/${jobId.value}`, + url: jobId.value ? `/api/detect/stream/${jobId.value}` : '', eventTypes: ['graph_update', 'stats_update', 'frame_update', 'detection', 'log', 'job_complete', 'waiting'], }) @@ -37,6 +45,12 @@ source.on('stats_update', (e) => { } }) +source.on<{ report?: { status?: string, error?: string } }>('job_complete', (e) => { + if (e.report?.status === 'failed') { + status.value = 'error' + } +}) + // Resizable splits const pipelineWidth = ref(320) const detectionsFlex = ref(3) // ratio for detections vs stats @@ -71,7 +85,36 @@ const statusMap: Record = { const checkStatus = () => { status.value = statusMap[source.status.value] ?? 'idle' } setInterval(checkStatus, 500) -source.connect() +if (jobId.value) { + source.connect() +} + +async function stopPipeline() { + if (!jobId.value) return + try { + await fetch(`/api/detect/stop/${jobId.value}`, { method: 'POST' }) + } catch { /* ignore — UI will see the cancel event via SSE */ } +} + +function onJobStarted(newJobId: string) { + jobId.value = newJobId + // Reset UI state + stats.value = null + runContext.value = null + status.value = 'processing' + logPanel.value?.clear() + pipeline.reset() + pipeline.setStatus('running') + // Update URL without reload + const url = new URL(window.location.href) + url.searchParams.set('job', newJobId) + window.history.pushState({}, '', url.toString()) + // Connect SSE to new job + source.disconnect() + source.setUrl(`/api/detect/stream/${newJobId}`) + source.connect() + // Switch to normal layout (reset sets it to normal already) +} + + + @@ -183,7 +242,7 @@ source.connect() @@ -200,12 +259,11 @@ body { } .app { - height: 100vh; + min-height: 100vh; display: grid; grid-template-rows: auto 1fr auto; padding: var(--space-4); gap: var(--space-2); - overflow: hidden; } header { @@ -235,6 +293,34 @@ header h1 { font-size: var(--font-size-lg); font-weight: 600; } font-size: var(--font-size-sm); } + +.header-btn { + background: var(--surface-2); + border: 1px solid var(--surface-3); + border-radius: 4px; + color: var(--text-secondary); + width: 28px; + height: 28px; + cursor: pointer; + display: flex; + align-items: center; + justify-content: center; + transition: all 0.15s; +} +.header-btn:hover { + background: var(--surface-3); + color: var(--text-primary); +} +.stop-btn { + background: var(--status-error); + color: #000; + font-size: 12px; + font-weight: 700; +} +.stop-btn:hover { + opacity: 0.8; +} + .job-id { color: var(--text-dim); font-size: var(--font-size-sm); margin-left: auto; } /* Main layout: pipeline left, content right — both same height */ @@ -328,11 +414,10 @@ header h1 { font-size: var(--font-size-lg); font-weight: 600; } .stat .label { color: var(--text-dim); font-size: var(--font-size-sm); } .stat .value { font-weight: 600; } -/* Log: full width bottom, fixed height */ +/* Log: full width bottom */ .log-row { flex-shrink: 0; - height: 160px; - overflow: hidden; + height: 200px; } .empty { color: var(--text-dim); padding: var(--space-6); text-align: center; } @@ -399,4 +484,50 @@ header h1 { font-size: var(--font-size-lg); font-weight: 600; } text-align: center; font-size: var(--font-size-sm); } + +/* Source selector */ +.source-selector { + display: flex; + flex-direction: column; + height: 100%; + gap: var(--space-3); + padding: var(--space-3); +} + +.source-info { + font-size: var(--font-size-sm); + color: var(--text-secondary); +} + +.source-hint { + color: var(--text-dim); + margin-top: var(--space-1); +} + +.source-hint code { + background: var(--surface-2); + padding: 1px 4px; + border-radius: 3px; +} + +.source-list { + flex: 1; + overflow-y: auto; + background: var(--surface-2); + border-radius: var(--panel-radius); + padding: var(--space-2); +} + +.source-loading { + color: var(--text-dim); + text-align: center; + padding: var(--space-4); + font-size: var(--font-size-sm); +} + +.source-actions { + flex-shrink: 0; + display: flex; + justify-content: flex-end; +} diff --git a/ui/detection-app/src/panels/LogPanel.vue b/ui/detection-app/src/panels/LogPanel.vue index f881577..a95a2af 100644 --- a/ui/detection-app/src/panels/LogPanel.vue +++ b/ui/detection-app/src/panels/LogPanel.vue @@ -21,6 +21,12 @@ props.source.on('log', (e) => { ts: e.ts, }) }) + +function clear() { + entries.value = [] +} + +defineExpose({ clear })