diff --git a/core/api/detect/config.py b/core/api/detect/config.py index 0ce70f7..ed12bf7 100644 --- a/core/api/detect/config.py +++ b/core/api/detect/config.py @@ -57,6 +57,13 @@ def write_config(update: ConfigUpdate): return _runtime_config +@router.get("/config/profiles") +def list_profiles(): + """List available detection profiles.""" + from detect.profiles import _PROFILES + return [{"name": name} for name in _PROFILES] + + @router.get("/config/stages", response_model=list[StageConfigInfo]) def list_stage_configs(): """Return the stage palette with config field metadata for the editor.""" diff --git a/core/api/detect/run.py b/core/api/detect/run.py index d3049d5..9b34bc3 100644 --- a/core/api/detect/run.py +++ b/core/api/detect/run.py @@ -1,9 +1,13 @@ """ Pipeline run endpoints. -POST /detect/run — launch pipeline on selected source -POST /detect/stop/{job_id} — cancel a running pipeline -POST /detect/clear/{job_id} — clear events from Redis +POST /detect/run — launch pipeline on selected source +POST /detect/stop/{job_id} — cancel a running pipeline +POST /detect/pause/{job_id} — pause after current stage +POST /detect/resume/{job_id} — resume a paused pipeline +POST /detect/step/{job_id} — run one stage then pause +POST /detect/clear/{job_id} — clear events from Redis +GET /detect/status/{job_id} — pipeline run status """ from __future__ import annotations @@ -33,6 +37,7 @@ class RunRequest(BaseModel): skip_vlm: bool = False skip_cloud: bool = False log_level: str = "INFO" # INFO | DEBUG + pause_after_stage: bool = False class RunResponse(BaseModel): @@ -92,9 +97,13 @@ def run_pipeline(req: RunRequest): source_asset_id=req.source_asset_id, ) - from detect.graph import PipelineCancelled, set_cancel_check, clear_cancel_check + from detect.graph import ( + PipelineCancelled, set_cancel_check, clear_cancel_check, + init_pause, clear_pause, + ) set_cancel_check(job_id, lambda: job_id in _cancelled_jobs) + init_pause(job_id, pause_after_stage=req.pause_after_stage) def _run(): try: @@ -123,6 +132,7 @@ def run_pipeline(req: RunRequest): _running_jobs.pop(job_id, None) _cancelled_jobs.discard(job_id) clear_cancel_check(job_id) + clear_pause(job_id) emit.clear_run_context() thread = threading.Thread(target=_run, daemon=True, name=f"pipeline-{job_id}") @@ -145,6 +155,75 @@ def stop_pipeline(job_id: str): return {"status": "stopping", "job_id": job_id} +@router.post("/pause/{job_id}") +def pause(job_id: str): + """Pause a running pipeline after the current stage completes.""" + from detect.graph import pause_pipeline + + if job_id not in _running_jobs: + raise HTTPException(status_code=404, detail=f"No running pipeline: {job_id}") + + pause_pipeline(job_id) + return {"status": "pausing", "job_id": job_id} + + +@router.post("/resume/{job_id}") +def resume(job_id: str): + """Resume a paused pipeline.""" + from detect.graph import resume_pipeline + + if job_id not in _running_jobs: + raise HTTPException(status_code=404, detail=f"No running pipeline: {job_id}") + + resume_pipeline(job_id) + return {"status": "running", "job_id": job_id} + + +@router.post("/step/{job_id}") +def step(job_id: str): + """Run one stage then pause again.""" + from detect.graph import step_pipeline + + if job_id not in _running_jobs: + raise HTTPException(status_code=404, detail=f"No running pipeline: {job_id}") + + step_pipeline(job_id) + return {"status": "stepping", "job_id": job_id} + + +@router.post("/pause-after-stage/{job_id}") +def toggle_pause_after_stage(job_id: str, enabled: bool = True): + """Toggle pause-after-each-stage mode.""" + from detect.graph import set_pause_after_stage + + if job_id not in _running_jobs: + raise HTTPException(status_code=404, detail=f"No running pipeline: {job_id}") + + set_pause_after_stage(job_id, enabled) + return {"status": "ok", "pause_after_stage": enabled, "job_id": job_id} + + +@router.get("/status/{job_id}") +def pipeline_status(job_id: str): + """Get pipeline run status.""" + from detect.graph import is_paused + + running = job_id in _running_jobs + paused = is_paused(job_id) + cancelling = job_id in _cancelled_jobs + + if cancelling: + status = "cancelling" + elif paused: + status = "paused" + elif running: + status = "running" + else: + status = "idle" + + return {"status": status, "job_id": job_id} + + @router.post("/clear/{job_id}") def clear_pipeline(job_id: str): """Clear events for a job from Redis.""" diff --git a/detect/graph/__init__.py b/detect/graph/__init__.py index f1d7073..48c5f5c 100644 --- a/detect/graph/__init__.py +++ b/detect/graph/__init__.py @@ -4,7 +4,7 @@ Detection pipeline graph. detect/graph/ nodes.py — node functions (one per stage) events.py — graph_update SSE emission - runner.py — pipeline execution (LangGraph wrapper, checkpoint, cancel) + runner.py — pipeline execution (LangGraph wrapper, checkpoint, cancel, pause) """ from .nodes import NODES, NODE_FUNCTIONS @@ -12,8 +12,15 @@ from .runner import ( PipelineCancelled, build_graph, clear_cancel_check, + clear_pause, get_pipeline, + init_pause, + is_paused, + pause_pipeline, + resume_pipeline, set_cancel_check, + set_pause_after_stage, + step_pipeline, ) from .events import _node_states @@ -25,5 +32,12 @@ __all__ = [ "get_pipeline", "set_cancel_check", "clear_cancel_check", + "init_pause", + "clear_pause", + "pause_pipeline", + "resume_pipeline", + "step_pipeline", + "set_pause_after_stage", + "is_paused", "_node_states", ] diff --git a/detect/graph/runner.py b/detect/graph/runner.py index 20997be..ed27ffc 100644 --- a/detect/graph/runner.py +++ b/detect/graph/runner.py @@ -7,13 +7,17 @@ custom runner in Phase 3, with an executor socket for distributed dispatch. from __future__ import annotations +import logging import os +import threading from langgraph.graph import END, StateGraph from detect.state import DetectState from .nodes import NODES, NODE_FUNCTIONS +logger = logging.getLogger(__name__) + # --- Checkpoint wrapper --- @@ -27,7 +31,15 @@ class PipelineCancelled(Exception): pass -# Cancellation hook — set by the run endpoint, checked before each node +class PipelinePaused(Exception): + """Raised when a pipeline is paused (internally, for flow control).""" + pass + + +# --------------------------------------------------------------------------- +# Cancellation — checked before each node +# --------------------------------------------------------------------------- + _cancel_check: dict[str, callable] = {} @@ -39,6 +51,92 @@ def clear_cancel_check(job_id: str): _cancel_check.pop(job_id, None) +# --------------------------------------------------------------------------- +# Pause / Resume / Step — checked after each node completes +# +# _pause_gate: threading.Event per job. When cleared, the runner blocks. +# When set, the runner proceeds to the next node. +# _pause_after_stage: if True, automatically clear the gate after each node. +# --------------------------------------------------------------------------- + +_pause_gate: dict[str, threading.Event] = {} +_pause_after_stage: dict[str, bool] = {} + + +def init_pause(job_id: str, pause_after_stage: bool = False): + """Initialize pause state for a job. Called when pipeline starts.""" + gate = threading.Event() + gate.set() # start unpaused + _pause_gate[job_id] = gate + _pause_after_stage[job_id] = pause_after_stage + + +def clear_pause(job_id: str): + """Clean up pause state. Called when pipeline finishes.""" + _pause_gate.pop(job_id, None) + _pause_after_stage.pop(job_id, None) + + +def pause_pipeline(job_id: str): + """Pause a running pipeline. It will block after the current stage completes.""" + gate = _pause_gate.get(job_id) + if gate: + gate.clear() + logger.info("Pipeline %s paused", job_id) + + +def resume_pipeline(job_id: str): + """Resume a paused pipeline.""" + gate = _pause_gate.get(job_id) + if gate: + gate.set() + logger.info("Pipeline %s resumed", job_id) + + +def step_pipeline(job_id: str): + """Run one stage then pause again.""" + _pause_after_stage[job_id] = True + gate = _pause_gate.get(job_id) + if gate: + gate.set() # unblock for one stage, _pause_after_stage re-pauses after + logger.info("Pipeline %s stepping", job_id) + + +def set_pause_after_stage(job_id: str, enabled: bool): + """Toggle pause-after-each-stage mode.""" + _pause_after_stage[job_id] = enabled + if not enabled: + # If disabling, also resume in case we're currently paused + gate = _pause_gate.get(job_id) + if gate: + gate.set() + + +def is_paused(job_id: str) -> bool: + """Check if a pipeline is currently paused.""" + gate = _pause_gate.get(job_id) + return gate is not None and not gate.is_set() + + +def _wait_if_paused(job_id: str, node_name: str): + """Block until resumed. Called after each node completes.""" + gate = _pause_gate.get(job_id) + if gate is None: + return + + # If pause-after-stage is on, pause now + if _pause_after_stage.get(job_id, False): + gate.clear() + from detect import emit + emit.log(job_id, "Pipeline", "INFO", f"Paused after {node_name}") + + # Block until gate is set (resume/step) or cancelled + while not gate.wait(timeout=0.5): + check = _cancel_check.get(job_id) + if check and check(): + raise PipelineCancelled(f"Cancelled while paused before next stage") + + def _checkpointing_node(node_name: str, node_fn): """Wrap a node function to auto-checkpoint after completion.""" @@ -81,6 +179,10 @@ def _checkpointing_node(node_name: str, node_fn): output_json=output_json, ) _latest_checkpoint[job_id] = new_checkpoint_id + + # Pause check — blocks if paused, respects cancel while waiting + _wait_if_paused(job_id, node_name) + return result wrapper.__name__ = node_fn.__name__ diff --git a/ui/detection-app/src/App.vue b/ui/detection-app/src/App.vue index 2e38800..30ff8dc 100644 --- a/ui/detection-app/src/App.vue +++ b/ui/detection-app/src/App.vue @@ -24,8 +24,10 @@ const logPanel = ref<{ clear: () => void } | null>(null) // SSE connection + pipeline status const { - jobId, stats, runContext, status, sseConnected, source, - stopPipeline, onJobStarted: sseJobStarted, + jobId, stats, runContext, status, paused, pauseAfterStage, + sseConnected, source, + stopPipeline, pausePipeline, resumePipeline, stepPipeline, + togglePauseAfterStage, onJobStarted: sseJobStarted, } = useSSEConnection() // Checkpoint frames + navigation @@ -50,9 +52,9 @@ function setCheckpointFrame(index: number) { } // Wire job start to clear log panel -function onJobStarted(newJobId: string) { +function onJobStarted(newJobId: string, opts?: { pauseAfterStage?: boolean }) { logPanel.value?.clear() - sseJobStarted(newJobId) + sseJobStarted(newJobId, opts) } @@ -62,20 +64,52 @@ function onJobStarted(newJobId: string) {

Detection Pipeline

{{ status }} - - {{ runContext.run_type }} · run: {{ runContext.run_id }} - - + + +
+ + + + + +
+ + PAUSED job: {{ jobId || '—' }}
@@ -203,19 +237,14 @@ function onJobStarted(newJobId: string) { - + - -
- - + +
+
@@ -283,6 +312,16 @@ header h1 { font-size: var(--font-size-lg); font-weight: 600; } background: var(--surface-3); color: var(--text-primary); } +.transport { + display: flex; + align-items: center; + gap: 2px; +} + +.play-btn { color: var(--status-live); } +.pause-btn { color: var(--text-secondary); } +.step-btn { color: var(--text-secondary); } +.step-btn:disabled { opacity: 0.3; cursor: not-allowed; } .stop-btn { background: var(--status-error); color: #000; @@ -293,6 +332,19 @@ header h1 { font-size: var(--font-size-lg); font-weight: 600; } opacity: 0.8; } +.pause-toggle { + display: flex; + align-items: center; + gap: 4px; + font-size: 10px; + color: var(--text-dim); + cursor: pointer; + margin-left: 4px; +} +.pause-toggle input { accent-color: var(--status-processing); } + +.status-badge.paused { background: var(--status-processing); color: #000; } + .job-id { color: var(--text-dim); font-size: var(--font-size-sm); margin-left: auto; } .stats-col { diff --git a/ui/detection-app/src/composables/useSSEConnection.ts b/ui/detection-app/src/composables/useSSEConnection.ts index 28bea6b..b845a15 100644 --- a/ui/detection-app/src/composables/useSSEConnection.ts +++ b/ui/detection-app/src/composables/useSSEConnection.ts @@ -68,6 +68,9 @@ export function useSSEConnection() { sseConnected.value = true } + const paused = ref(false) + const pauseAfterStage = ref(false) + async function stopPipeline() { if (!jobId.value) return try { @@ -75,11 +78,70 @@ export function useSSEConnection() { } catch { /* ignore — UI will see the cancel event via SSE */ } } - function onJobStarted(newJobId: string) { + async function pausePipeline() { + if (!jobId.value) return + try { + await fetch(`/api/detect/pause/${jobId.value}`, { method: 'POST' }) + paused.value = true + } catch { /* ignore */ } + } + + async function resumePipeline() { + if (!jobId.value) return + try { + await fetch(`/api/detect/resume/${jobId.value}`, { method: 'POST' }) + paused.value = false + } catch { /* ignore */ } + } + + async function stepPipeline() { + if (!jobId.value) return + try { + await fetch(`/api/detect/step/${jobId.value}`, { method: 'POST' }) + paused.value = false // briefly unpaused, will re-pause after stage + } catch { /* ignore */ } + } + + async function togglePauseAfterStage() { + if (!jobId.value) return + const next = !pauseAfterStage.value + try { + await fetch(`/api/detect/pause-after-stage/${jobId.value}?enabled=${next}`, { method: 'POST' }) + pauseAfterStage.value = next + } catch { /* ignore */ } + } + + // Poll pipeline status to track paused state + // (SSE doesn't emit pause events — the thread is blocked) + let statusPoll: ReturnType | null = null + + function startStatusPoll() { + if (statusPoll) return + statusPoll = setInterval(async () => { + if (!jobId.value) return + try { + const resp = await fetch(`/api/detect/status/${jobId.value}`) + if (!resp.ok) return + const data = await resp.json() + paused.value = data.status === 'paused' + } catch { /* ignore */ } + }, 1000) + } + + function stopStatusPoll() { + if (statusPoll) { + clearInterval(statusPoll) + statusPoll = null + } + } + + function onJobStarted(newJobId: string, opts?: { pauseAfterStage?: boolean }) { jobId.value = newJobId stats.value = null runContext.value = null status.value = 'processing' + paused.value = false + pauseAfterStage.value = opts?.pauseAfterStage ?? false pipeline.reset() pipeline.setStatus('running') // Update URL without reload @@ -91,16 +153,34 @@ export function useSSEConnection() { source.setUrl(`/api/detect/stream/${newJobId}`) source.connect() sseConnected.value = true + startStatusPoll() } + // Start polling if we already have an active job + if (jobId.value && sseConnected.value) { + startStatusPoll() + } + + // Stop polling when job completes + source.on<{ report?: { status?: string } }>('job_complete', () => { + stopStatusPoll() + paused.value = false + }) + return { jobId, stats, runContext, status, + paused, + pauseAfterStage, sseConnected, source: source as DataSource, stopPipeline, + pausePipeline, + resumePipeline, + stepPipeline, + togglePauseAfterStage, onJobStarted, } } diff --git a/ui/detection-app/src/panels/PipelineGraphPanel.vue b/ui/detection-app/src/panels/PipelineGraphPanel.vue index b236ba6..ecd63ea 100644 --- a/ui/detection-app/src/panels/PipelineGraphPanel.vue +++ b/ui/detection-app/src/panels/PipelineGraphPanel.vue @@ -17,6 +17,7 @@ const nodes = ref([]) // Derive graph mode from pipeline layout mode const graphMode = computed(() => { + if (pipeline.layoutMode === 'source_selector') return 'observe' if (pipeline.layoutMode === 'bbox_editor') return 'edit-isolated' if (pipeline.layoutMode === 'stage_editor') return 'edit-in-pipeline' return 'observe' @@ -29,6 +30,20 @@ watch(stageNames, (names) => { } }, { immediate: true }) +// Source selector: placeholders until a chunk is selected, then real stage names +const displayNodes = computed(() => { + if (pipeline.layoutMode === 'source_selector') { + if (pipeline.sourceHasSelection) { + return stageNames.value.map((id) => ({ id, status: 'pending' as const })) + } + return Array.from({ length: 10 }, (_, i) => ({ + id: `_placeholder_${i}`, + status: 'placeholder' as const, + })) + } + return nodes.value +}) + props.source.on<{ nodes: GraphNode[] }>('graph_update', (e) => { nodes.value = e.nodes }) @@ -60,7 +75,7 @@ function onOpenStageEditor(stage: string) {