phase cv 0

This commit is contained in:
2026-03-26 22:22:35 -03:00
parent beb0416280
commit 65814b5b9e
46 changed files with 2962 additions and 268 deletions

View File

@@ -0,0 +1 @@
"""CV operations — pure OpenCV, no ML models."""

258
gpu/models/cv/edges.py Normal file
View File

@@ -0,0 +1,258 @@
"""
Edge detection — Canny + HoughLinesP → parallel line pairs → bounding boxes.
Finds horizontal line pairs with consistent spacing, which correspond to
the top and bottom edges of advertising hoardings.
"""
from __future__ import annotations
import base64
import io
import cv2
import numpy as np
def detect_edges(
image: np.ndarray,
canny_low: int = 50,
canny_high: int = 150,
hough_threshold: int = 80,
hough_min_length: int = 100,
hough_max_gap: int = 10,
pair_max_distance: int = 200,
pair_min_distance: int = 15,
) -> list[dict]:
"""
Find horizontal line pairs that likely bound advertising hoardings.
Returns list of dicts with keys: x, y, w, h, confidence, label.
Each box represents the region between a detected pair of parallel
horizontal lines.
"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, canny_low, canny_high)
raw_lines = cv2.HoughLinesP(
edges,
rho=1,
theta=np.pi / 180,
threshold=hough_threshold,
minLineLength=hough_min_length,
maxLineGap=hough_max_gap,
)
if raw_lines is None:
return []
# Filter to near-horizontal lines (within 10 degrees)
horizontals = _filter_horizontal(raw_lines, max_angle_deg=10)
if len(horizontals) < 2:
return []
# Find pairs of parallel horizontals with consistent spacing
pairs = _find_line_pairs(
horizontals,
min_distance=pair_min_distance,
max_distance=pair_max_distance,
)
# Convert pairs to bounding boxes
h, w = image.shape[:2]
results = []
for top_line, bottom_line in pairs:
box = _pair_to_bbox(top_line, bottom_line, frame_width=w, frame_height=h)
if box is not None:
results.append(box)
return results
def _filter_horizontal(lines: np.ndarray, max_angle_deg: float = 10) -> list[tuple]:
"""Keep only lines within max_angle_deg of horizontal."""
max_slope = np.tan(np.radians(max_angle_deg))
result = []
for line in lines:
x1, y1, x2, y2 = line[0]
dx = x2 - x1
if dx == 0:
continue
slope = abs((y2 - y1) / dx)
if slope <= max_slope:
y_mid = (y1 + y2) / 2
x_min = min(x1, x2)
x_max = max(x1, x2)
length = np.sqrt(dx**2 + (y2 - y1) ** 2)
result.append((x_min, x_max, y_mid, length))
return result
def _find_line_pairs(
horizontals: list[tuple],
min_distance: int,
max_distance: int,
) -> list[tuple]:
"""
Find pairs of horizontal lines that could be top/bottom of a hoarding.
Lines must overlap horizontally and be spaced within [min_distance, max_distance].
"""
# Sort by y position
sorted_lines = sorted(horizontals, key=lambda l: l[2])
pairs = []
used = set()
for i, top in enumerate(sorted_lines):
if i in used:
continue
for j, bottom in enumerate(sorted_lines[i + 1 :], start=i + 1):
if j in used:
continue
y_gap = bottom[2] - top[2]
if y_gap < min_distance:
continue
if y_gap > max_distance:
break # sorted by y, no point checking further
# Check horizontal overlap
overlap_start = max(top[0], bottom[0])
overlap_end = min(top[1], bottom[1])
overlap = overlap_end - overlap_start
# Require at least 50% overlap relative to shorter line
shorter_length = min(top[1] - top[0], bottom[1] - bottom[0])
if shorter_length > 0 and overlap / shorter_length >= 0.5:
pairs.append((top, bottom))
used.add(i)
used.add(j)
break
return pairs
def _pair_to_bbox(
top: tuple,
bottom: tuple,
frame_width: int,
frame_height: int,
) -> dict | None:
"""Convert a line pair to a bounding box dict."""
x = int(max(0, min(top[0], bottom[0])))
y = int(max(0, top[2]))
x2 = int(min(frame_width, max(top[1], bottom[1])))
y2 = int(min(frame_height, bottom[2]))
w = x2 - x
h = y2 - y
if w < 20 or h < 5:
return None
# Confidence based on line lengths relative to box width
avg_line_length = (top[3] + bottom[3]) / 2
coverage = min(1.0, avg_line_length / max(w, 1))
return {
"x": x,
"y": y,
"w": w,
"h": h,
"confidence": round(coverage, 3),
"label": "edge_region",
}
def _np_to_b64_jpeg(image: np.ndarray, quality: int = 70) -> str:
"""Encode a numpy image (BGR or grayscale) as base64 JPEG."""
ok, buf = cv2.imencode(".jpg", image, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
return ""
return base64.b64encode(buf.tobytes()).decode()
def detect_edges_debug(
image: np.ndarray,
canny_low: int = 50,
canny_high: int = 150,
hough_threshold: int = 80,
hough_min_length: int = 100,
hough_max_gap: int = 10,
pair_max_distance: int = 200,
pair_min_distance: int = 15,
) -> dict:
"""
Same as detect_edges but returns intermediate visualizations.
Returns dict with:
regions: list[dict] — same boxes as detect_edges
edge_overlay_b64: str — Canny edge image as base64 JPEG
lines_overlay_b64: str — frame with Hough lines drawn
horizontal_count: int — number of horizontal lines found
pair_count: int — number of line pairs found
"""
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, canny_low, canny_high)
# Edge overlay — Canny output as-is (white edges on black)
edge_overlay_b64 = _np_to_b64_jpeg(edges)
raw_lines = cv2.HoughLinesP(
edges,
rho=1,
theta=np.pi / 180,
threshold=hough_threshold,
minLineLength=hough_min_length,
maxLineGap=hough_max_gap,
)
# Lines overlay — draw all Hough lines on a copy of the frame
lines_vis = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if raw_lines is not None:
for line in raw_lines:
x1, y1, x2, y2 = line[0]
cv2.line(lines_vis, (x1, y1), (x2, y2), (0, 0, 255), 1)
horizontals = []
if raw_lines is not None:
horizontals = _filter_horizontal(raw_lines, max_angle_deg=10)
# Draw horizontal lines in cyan, thicker
for h_line in horizontals:
x_min, x_max, y_mid, _ = h_line
cv2.line(lines_vis, (int(x_min), int(y_mid)), (int(x_max), int(y_mid)), (255, 255, 0), 2)
pairs = []
if len(horizontals) >= 2:
pairs = _find_line_pairs(
horizontals,
min_distance=pair_min_distance,
max_distance=pair_max_distance,
)
# Draw paired lines in green
for top_line, bottom_line in pairs:
cv2.line(lines_vis, (int(top_line[0]), int(top_line[2])),
(int(top_line[1]), int(top_line[2])), (0, 255, 0), 2)
cv2.line(lines_vis, (int(bottom_line[0]), int(bottom_line[2])),
(int(bottom_line[1]), int(bottom_line[2])), (0, 255, 0), 2)
lines_overlay_b64 = _np_to_b64_jpeg(lines_vis)
# Build region boxes (same logic as detect_edges)
h, w = image.shape[:2]
regions = []
for top_line, bottom_line in pairs:
box = _pair_to_bbox(top_line, bottom_line, frame_width=w, frame_height=h)
if box is not None:
regions.append(box)
return {
"regions": regions,
"edge_overlay_b64": edge_overlay_b64,
"lines_overlay_b64": lines_overlay_b64,
"horizontal_count": len(horizontals),
"pair_count": len(pairs),
}

View File

@@ -0,0 +1,112 @@
"""
Pydantic Models - GENERATED FILE
Do not edit directly. Regenerate using modelgen.
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field
class DetectRequest(BaseModel):
"""Request body for object detection."""
image: str
model: Optional[str] = None
confidence: Optional[float] = None
target_classes: Optional[List[str]] = None
class BBox(BaseModel):
"""A detected bounding box."""
x: int
y: int
w: int
h: int
confidence: float
label: str
class DetectResponse(BaseModel):
"""Response from object detection."""
detections: List[BBox] = Field(default_factory=list)
class OCRRequest(BaseModel):
"""Request body for OCR."""
image: str
languages: Optional[List[str]] = None
class OCRTextResult(BaseModel):
"""A single OCR text extraction result."""
text: str
confidence: float
bbox: List[int] = Field(default_factory=list)
class OCRResponse(BaseModel):
"""Response from OCR."""
results: List[OCRTextResult] = Field(default_factory=list)
class PreprocessRequest(BaseModel):
"""Request body for image preprocessing."""
image: str
binarize: bool = False
deskew: bool = False
contrast: bool = True
class PreprocessResponse(BaseModel):
"""Response from preprocessing."""
image: str
class VLMRequest(BaseModel):
"""Request body for visual language model query."""
image: str
prompt: str
model: Optional[str] = None
class VLMResponse(BaseModel):
"""Response from VLM."""
brand: str
confidence: float
reasoning: str
class AnalyzeRegionsRequest(BaseModel):
"""Request body for CV region analysis."""
image: str
edge_canny_low: int = 50
edge_canny_high: int = 150
edge_hough_threshold: int = 80
edge_hough_min_length: int = 100
edge_hough_max_gap: int = 10
edge_pair_max_distance: int = 200
edge_pair_min_distance: int = 15
class RegionBox(BaseModel):
"""A candidate region from CV analysis."""
x: int
y: int
w: int
h: int
confidence: float
label: str
class AnalyzeRegionsResponse(BaseModel):
"""Response from CV region analysis."""
regions: List[RegionBox] = Field(default_factory=list)
class AnalyzeRegionsDebugResponse(BaseModel):
"""Response from CV region analysis with debug overlays."""
regions: List[RegionBox] = Field(default_factory=list)
edge_overlay_b64: str = ""
lines_overlay_b64: str = ""
horizontal_count: int = 0
pair_count: int = 0
class ConfigUpdate(BaseModel):
"""Request body for updating server configuration."""
device: Optional[str] = None
yolo_model: Optional[str] = None
yolo_confidence: Optional[float] = None
vram_budget_mb: Optional[int] = None
strategy: Optional[str] = None
ocr_languages: Optional[List[str]] = None
ocr_min_confidence: Optional[float] = None

View File

@@ -52,74 +52,25 @@ def _gpu_log(job_id: str, log_level: str, stage: str, level: str, msg: str):
emit_log(job_id, stage, level, msg, log_level=log_level)
# --- Request/Response models ---
# --- Request/Response models (generated from core/schema/models/inference.py) ---
class DetectRequest(BaseModel):
image: str
model: str | None = None
confidence: float | None = None
target_classes: list[str] | None = None
class BBox(BaseModel):
x: int
y: int
w: int
h: int
confidence: float
label: str
class DetectResponse(BaseModel):
detections: list[BBox]
class OCRRequest(BaseModel):
image: str
languages: list[str] | None = None
class OCRTextResult(BaseModel):
text: str
confidence: float
bbox: list[int]
class OCRResponse(BaseModel):
results: list[OCRTextResult]
class PreprocessRequest(BaseModel):
image: str
binarize: bool = False
deskew: bool = False
contrast: bool = True
class PreprocessResponse(BaseModel):
image: str # base64 JPEG of processed image
class VLMRequest(BaseModel):
image: str
prompt: str
model: str | None = None
class VLMResponse(BaseModel):
brand: str
confidence: float
reasoning: str
class ConfigUpdate(BaseModel):
device: str | None = None
yolo_model: str | None = None
yolo_confidence: float | None = None
vram_budget_mb: int | None = None
strategy: str | None = None
ocr_languages: list[str] | None = None
ocr_min_confidence: float | None = None
from models.inference_contract import (
AnalyzeRegionsDebugResponse,
AnalyzeRegionsRequest,
AnalyzeRegionsResponse,
BBox,
ConfigUpdate,
DetectRequest,
DetectResponse,
OCRRequest,
OCRResponse,
OCRTextResult,
PreprocessRequest,
PreprocessResponse,
RegionBox,
VLMRequest,
VLMResponse,
)
# --- App ---
@@ -281,6 +232,84 @@ def vlm(req: VLMRequest, request: Request):
return VLMResponse(**result)
@app.post("/detect_edges", response_model=AnalyzeRegionsResponse)
def detect_edges_endpoint(req: AnalyzeRegionsRequest, request: Request):
job_id, log_level = _job_ctx(request)
try:
image = _decode_image(req.image)
h, w = image.shape[:2]
except Exception as e:
raise HTTPException(status_code=400, detail=f"Bad image: {e}")
try:
t0 = time.monotonic()
from models.cv.edges import detect_edges
edge_regions = detect_edges(
image,
canny_low=req.edge_canny_low,
canny_high=req.edge_canny_high,
hough_threshold=req.edge_hough_threshold,
hough_min_length=req.edge_hough_min_length,
hough_max_gap=req.edge_hough_max_gap,
pair_max_distance=req.edge_pair_max_distance,
pair_min_distance=req.edge_pair_min_distance,
)
infer_ms = (time.monotonic() - t0) * 1000
_gpu_log(job_id, log_level, "GPU:CV", "DEBUG",
f"Edge analysis {w}x{h}: {infer_ms:.0f}ms → {len(edge_regions)} regions")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Region analysis failed: {e}")
boxes = [RegionBox(**r) for r in edge_regions]
return AnalyzeRegionsResponse(regions=boxes)
@app.post("/detect_edges/debug", response_model=AnalyzeRegionsDebugResponse)
def detect_edges_debug_endpoint(req: AnalyzeRegionsRequest, request: Request):
job_id, log_level = _job_ctx(request)
try:
image = _decode_image(req.image)
h, w = image.shape[:2]
except Exception as e:
raise HTTPException(status_code=400, detail=f"Bad image: {e}")
try:
t0 = time.monotonic()
from models.cv.edges import detect_edges_debug
result = detect_edges_debug(
image,
canny_low=req.edge_canny_low,
canny_high=req.edge_canny_high,
hough_threshold=req.edge_hough_threshold,
hough_min_length=req.edge_hough_min_length,
hough_max_gap=req.edge_hough_max_gap,
pair_max_distance=req.edge_pair_max_distance,
pair_min_distance=req.edge_pair_min_distance,
)
infer_ms = (time.monotonic() - t0) * 1000
_gpu_log(job_id, log_level, "GPU:CV", "DEBUG",
f"Edge debug {w}x{h}: {infer_ms:.0f}ms → {len(result['regions'])} regions, "
f"{result['horizontal_count']} horizontals, {result['pair_count']} pairs")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Region debug analysis failed: {e}")
boxes = [RegionBox(**r) for r in result["regions"]]
response = AnalyzeRegionsDebugResponse(
regions=boxes,
edge_overlay_b64=result["edge_overlay_b64"],
lines_overlay_b64=result["lines_overlay_b64"],
horizontal_count=result["horizontal_count"],
pair_count=result["pair_count"],
)
return response
if __name__ == "__main__":
import uvicorn