From bcf6f3dc71832391d80a67efd1b2346789627fa4 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Fri, 27 Mar 2026 05:19:45 -0300 Subject: [PATCH] use sqlalchemy pattern --- core/db/__init__.py | 42 +++--- core/db/assets.py | 59 ++------ core/db/brand.py | 61 +++++++++ core/db/checkpoint.py | 43 ++++++ core/db/connection.py | 3 +- core/db/detect.py | 239 --------------------------------- core/db/job.py | 19 +++ core/db/jobs.py | 49 ------- core/db/models.py | 238 -------------------------------- core/db/presets.py | 20 --- core/db/tables.py | 97 +++++++++++++ core/schema/__init__.py | 40 ++---- core/schema/models/__init__.py | 52 +++---- core/schema/models/job.py | 158 ++++++++++++++++++++++ 14 files changed, 451 insertions(+), 669 deletions(-) create mode 100644 core/db/brand.py create mode 100644 core/db/checkpoint.py delete mode 100644 core/db/detect.py create mode 100644 core/db/job.py delete mode 100644 core/db/jobs.py delete mode 100644 core/db/models.py delete mode 100644 core/db/presets.py create mode 100644 core/db/tables.py create mode 100644 core/schema/models/job.py diff --git a/core/db/__init__.py b/core/db/__init__.py index 899a17a..971b297 100644 --- a/core/db/__init__.py +++ b/core/db/__init__.py @@ -1,20 +1,24 @@ -from .assets import ( - create_asset, - delete_asset, - get_asset, - get_asset_filenames, - list_assets, - update_asset, -) -from .jobs import ( - create_job, - get_job, - list_jobs, - update_job, - update_job_fields, -) -from .presets import ( - get_preset, - list_presets, -) +""" +Database layer. + +tables.py — SQLModel table definitions (generated by modelgen, don't edit) +domain files — session-first query functions for non-trivial operations + +Basic CRUD (create, get, update, delete) goes directly through the session: + session.add(Job(...)) + session.get(Job, id) + session.get(Job, id); setattr(...); session.commit() + session.delete(obj); session.commit() +""" + from .connection import get_session, create_tables + +from .tables import MediaAsset, Job, Timeline, Checkpoint, Brand + +from .assets import list_assets, get_asset_filenames +from .job import list_jobs +from .checkpoint import ( + get_latest_checkpoint, get_root_checkpoint, + list_checkpoints, list_scenarios, +) +from .brand import get_or_create_brand, find_brand_by_text, list_brands, record_airing diff --git a/core/db/assets.py b/core/db/assets.py index 8f849df..b0972c1 100644 --- a/core/db/assets.py +++ b/core/db/assets.py @@ -1,58 +1,23 @@ -"""Database operations for MediaAsset — SQLModel.""" +"""MediaAsset queries.""" from __future__ import annotations from typing import Optional from uuid import UUID -from sqlmodel import select +from sqlmodel import Session, select -from .connection import get_session -from .models import MediaAsset +from .tables import MediaAsset -def list_assets(status: Optional[str] = None, search: Optional[str] = None) -> list[MediaAsset]: - with get_session() as session: - stmt = select(MediaAsset) - if status: - stmt = stmt.where(MediaAsset.status == status) - if search: - stmt = stmt.where(MediaAsset.filename.ilike(f"%{search}%")) - return list(session.exec(stmt).all()) +def list_assets(session: Session, status: Optional[str] = None, search: Optional[str] = None) -> list[MediaAsset]: + stmt = select(MediaAsset) + if status: + stmt = stmt.where(MediaAsset.status == status) + if search: + stmt = stmt.where(MediaAsset.filename.ilike(f"%{search}%")) + return list(session.exec(stmt).all()) -def get_asset(id: UUID) -> MediaAsset | None: - with get_session() as session: - return session.get(MediaAsset, id) - - -def get_asset_filenames() -> set[str]: - with get_session() as session: - return set(session.exec(select(MediaAsset.filename)).all()) - - -def create_asset(*, filename: str, file_path: str, file_size: int) -> MediaAsset: - asset = MediaAsset(filename=filename, file_path=file_path, file_size=file_size) - with get_session() as session: - session.add(asset) - session.commit() - session.refresh(asset) - return asset - - -def update_asset(id: UUID, **fields) -> None: - with get_session() as session: - asset = session.get(MediaAsset, id) - if not asset: - return - for k, v in fields.items(): - setattr(asset, k, v) - session.commit() - - -def delete_asset(id: UUID) -> None: - with get_session() as session: - asset = session.get(MediaAsset, id) - if asset: - session.delete(asset) - session.commit() +def get_asset_filenames(session: Session) -> set[str]: + return set(session.exec(select(MediaAsset.filename)).all()) diff --git a/core/db/brand.py b/core/db/brand.py new file mode 100644 index 0000000..c8a9ee9 --- /dev/null +++ b/core/db/brand.py @@ -0,0 +1,61 @@ +"""Brand queries.""" + +from __future__ import annotations + +from typing import Optional +from uuid import UUID + +from sqlmodel import Session, select + +from .tables import Brand + + +def get_or_create_brand(session: Session, canonical_name: str, + aliases: Optional[list[str]] = None, + source: str = "ocr") -> tuple[Brand, bool]: + normalized = canonical_name.strip() + brand = session.exec(select(Brand).where(Brand.canonical_name.ilike(normalized))).first() + if brand: + return brand, False + + brand = Brand(canonical_name=normalized, aliases=aliases or [], source=source) + session.add(brand) + session.flush() + return brand, True + + +def find_brand_by_text(session: Session, text: str) -> Brand | None: + normalized = text.strip().lower() + brand = session.exec(select(Brand).where(Brand.canonical_name.ilike(normalized))).first() + if brand: + return brand + + for b in session.exec(select(Brand)).all(): + if normalized in [a.lower() for a in (b.aliases or [])]: + return b + return None + + +def list_brands(session: Session) -> list[Brand]: + return list(session.exec(select(Brand).order_by(Brand.canonical_name)).all()) + + +def record_airing(session: Session, brand_id: UUID, timeline_id: UUID, + frame_start: int, frame_end: int, + confidence: float, source: str = "ocr") -> Brand: + brand = session.get(Brand, brand_id) + if not brand: + raise ValueError(f"Brand not found: {brand_id}") + + airing = { + "timeline_id": str(timeline_id), + "frame_start": frame_start, + "frame_end": frame_end, + "confidence": confidence, + "source": source, + } + airings = list(brand.airings or []) + airings.append(airing) + brand.airings = airings + brand.total_airings = len(airings) + return brand diff --git a/core/db/checkpoint.py b/core/db/checkpoint.py new file mode 100644 index 0000000..1669b6b --- /dev/null +++ b/core/db/checkpoint.py @@ -0,0 +1,43 @@ +"""Checkpoint queries.""" + +from __future__ import annotations + +from uuid import UUID + +from sqlmodel import Session, select + +from .tables import Checkpoint + + +def get_latest_checkpoint(session: Session, timeline_id: UUID, parent_id: UUID | None = None) -> Checkpoint | None: + stmt = select(Checkpoint).where(Checkpoint.timeline_id == timeline_id) + if parent_id is not None: + stmt = stmt.where(Checkpoint.parent_id == parent_id) + stmt = stmt.order_by(Checkpoint.created_at.desc()) + return session.exec(stmt).first() + + +def get_root_checkpoint(session: Session, timeline_id: UUID) -> Checkpoint | None: + stmt = select(Checkpoint).where( + Checkpoint.timeline_id == timeline_id, + Checkpoint.parent_id == None, + ) + return session.exec(stmt).first() + + +def list_checkpoints(session: Session, timeline_id: UUID) -> list[Checkpoint]: + stmt = ( + select(Checkpoint) + .where(Checkpoint.timeline_id == timeline_id) + .order_by(Checkpoint.created_at) + ) + return list(session.exec(stmt).all()) + + +def list_scenarios(session: Session) -> list[Checkpoint]: + stmt = ( + select(Checkpoint) + .where(Checkpoint.is_scenario == True) + .order_by(Checkpoint.created_at.desc()) + ) + return list(session.exec(stmt).all()) diff --git a/core/db/connection.py b/core/db/connection.py index ef89009..a72b213 100644 --- a/core/db/connection.py +++ b/core/db/connection.py @@ -29,5 +29,6 @@ def get_session() -> Session: def create_tables(): """Create all SQLModel tables.""" - from .models import SQLModel # noqa — registers all models + from sqlmodel import SQLModel + from . import tables # noqa — registers all table classes SQLModel.metadata.create_all(get_engine()) diff --git a/core/db/detect.py b/core/db/detect.py deleted file mode 100644 index 58f152c..0000000 --- a/core/db/detect.py +++ /dev/null @@ -1,239 +0,0 @@ -"""Database operations for detection pipeline — SQLModel.""" - -from __future__ import annotations - -from typing import Optional -from uuid import UUID - -from sqlmodel import select - -from .connection import get_session -from .models import ( - DetectJob, Timeline, Checkpoint, - KnownBrand, SourceBrandSighting, -) - - -# --------------------------------------------------------------------------- -# DetectJob -# --------------------------------------------------------------------------- - -def create_detect_job(**fields) -> DetectJob: - job = DetectJob(**fields) - with get_session() as session: - session.add(job) - session.commit() - session.refresh(job) - return job - - -def get_detect_job(id: UUID) -> DetectJob | None: - with get_session() as session: - return session.get(DetectJob, id) - - -def update_detect_job(job_id: UUID, **fields) -> None: - with get_session() as session: - job = session.get(DetectJob, job_id) - if not job: - return - for k, v in fields.items(): - setattr(job, k, v) - session.commit() - - -def list_detect_jobs( - parent_job_id: Optional[UUID] = None, - status: Optional[str] = None, -) -> list[DetectJob]: - with get_session() as session: - stmt = select(DetectJob) - if parent_job_id: - stmt = stmt.where(DetectJob.parent_job_id == parent_job_id) - if status: - stmt = stmt.where(DetectJob.status == status) - return list(session.exec(stmt).all()) - - -# --------------------------------------------------------------------------- -# Timeline -# --------------------------------------------------------------------------- - -def create_timeline(**fields) -> Timeline: - timeline = Timeline(**fields) - with get_session() as session: - session.add(timeline) - session.commit() - session.refresh(timeline) - return timeline - - -def get_timeline(timeline_id: UUID) -> Timeline | None: - with get_session() as session: - return session.get(Timeline, timeline_id) - - -# --------------------------------------------------------------------------- -# Checkpoint -# --------------------------------------------------------------------------- - -def save_checkpoint(**fields) -> Checkpoint: - checkpoint = Checkpoint(**fields) - with get_session() as session: - session.add(checkpoint) - session.commit() - session.refresh(checkpoint) - return checkpoint - - -def get_checkpoint(checkpoint_id: UUID) -> Checkpoint | None: - with get_session() as session: - return session.get(Checkpoint, checkpoint_id) - - -def get_latest_checkpoint(timeline_id: UUID, parent_id: UUID | None = None) -> Checkpoint | None: - """Get the most recent checkpoint for a timeline, optionally from a specific parent.""" - with get_session() as session: - stmt = ( - select(Checkpoint) - .where(Checkpoint.timeline_id == timeline_id) - ) - if parent_id is not None: - stmt = stmt.where(Checkpoint.parent_id == parent_id) - stmt = stmt.order_by(Checkpoint.created_at.desc()) - return session.exec(stmt).first() - - -def list_checkpoints(timeline_id: UUID) -> list[Checkpoint]: - """List all checkpoints for a timeline.""" - with get_session() as session: - stmt = ( - select(Checkpoint) - .where(Checkpoint.timeline_id == timeline_id) - .order_by(Checkpoint.created_at) - ) - return list(session.exec(stmt).all()) - - -def get_root_checkpoint(timeline_id: UUID) -> Checkpoint | None: - """Get the root checkpoint (no parent) for a timeline.""" - with get_session() as session: - stmt = select(Checkpoint).where( - Checkpoint.timeline_id == timeline_id, - Checkpoint.parent_id == None, - ) - return session.exec(stmt).first() - - -def list_scenarios() -> list[Checkpoint]: - """List all checkpoints marked as scenarios.""" - with get_session() as session: - stmt = ( - select(Checkpoint) - .where(Checkpoint.is_scenario == True) - .order_by(Checkpoint.created_at.desc()) - ) - return list(session.exec(stmt).all()) - - -# --------------------------------------------------------------------------- -# KnownBrand -# --------------------------------------------------------------------------- - -def get_or_create_brand(canonical_name: str, aliases: Optional[list[str]] = None, - source: str = "ocr") -> tuple[KnownBrand, bool]: - normalized = canonical_name.strip() - with get_session() as session: - stmt = select(KnownBrand).where(KnownBrand.canonical_name.ilike(normalized)) - brand = session.exec(stmt).first() - if brand: - return brand, False - - brand = KnownBrand( - canonical_name=normalized, - aliases=aliases or [], - first_source=source, - ) - session.add(brand) - session.commit() - session.refresh(brand) - return brand, True - - -def find_brand_by_text(text: str) -> KnownBrand | None: - normalized = text.strip().lower() - with get_session() as session: - stmt = select(KnownBrand).where(KnownBrand.canonical_name.ilike(normalized)) - brand = session.exec(stmt).first() - if brand: - return brand - - # Alias search — check if normalized is in any brand's aliases - all_brands = session.exec(select(KnownBrand)).all() - for b in all_brands: - if normalized in [a.lower() for a in (b.aliases or [])]: - return b - return None - - -def list_all_brands() -> list[KnownBrand]: - with get_session() as session: - return list(session.exec(select(KnownBrand).order_by(KnownBrand.canonical_name)).all()) - - -def update_brand(brand_id: UUID, **fields) -> None: - with get_session() as session: - brand = session.get(KnownBrand, brand_id) - if not brand: - return - for k, v in fields.items(): - setattr(brand, k, v) - session.commit() - - -# --------------------------------------------------------------------------- -# SourceBrandSighting -# --------------------------------------------------------------------------- - -def get_source_sightings(source_asset_id: UUID) -> list[SourceBrandSighting]: - with get_session() as session: - stmt = ( - select(SourceBrandSighting) - .where(SourceBrandSighting.source_asset_id == source_asset_id) - .order_by(SourceBrandSighting.occurrences.desc()) - ) - return list(session.exec(stmt).all()) - - -def record_sighting(source_asset_id: UUID, brand_id: UUID, brand_name: str, - timestamp: float, confidence: float, source: str = "ocr") -> SourceBrandSighting: - with get_session() as session: - stmt = select(SourceBrandSighting).where( - SourceBrandSighting.source_asset_id == source_asset_id, - SourceBrandSighting.brand_id == brand_id, - ) - sighting = session.exec(stmt).first() - - if sighting: - total_conf = sighting.avg_confidence * sighting.occurrences + confidence - sighting.occurrences += 1 - sighting.last_seen_timestamp = timestamp - sighting.avg_confidence = total_conf / sighting.occurrences - session.commit() - session.refresh(sighting) - return sighting - - sighting = SourceBrandSighting( - source_asset_id=source_asset_id, - brand_id=brand_id, - brand_name=brand_name, - first_seen_timestamp=timestamp, - last_seen_timestamp=timestamp, - occurrences=1, - detection_source=source, - avg_confidence=confidence, - ) - session.add(sighting) - session.commit() - session.refresh(sighting) - return sighting diff --git a/core/db/job.py b/core/db/job.py new file mode 100644 index 0000000..0d17927 --- /dev/null +++ b/core/db/job.py @@ -0,0 +1,19 @@ +"""Job queries.""" + +from __future__ import annotations + +from typing import Optional +from uuid import UUID + +from sqlmodel import Session, select + +from .tables import Job + + +def list_jobs(session: Session, parent_id: Optional[UUID] = None, status: Optional[str] = None) -> list[Job]: + stmt = select(Job) + if parent_id: + stmt = stmt.where(Job.parent_id == parent_id) + if status: + stmt = stmt.where(Job.status == status) + return list(session.exec(stmt).all()) diff --git a/core/db/jobs.py b/core/db/jobs.py deleted file mode 100644 index cc73abd..0000000 --- a/core/db/jobs.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Database operations for TranscodeJob — SQLModel.""" - -from __future__ import annotations - -from typing import Optional -from uuid import UUID - -from sqlmodel import select - -from .connection import get_session -from .models import TranscodeJob - - -def list_jobs(status: Optional[str] = None, source_asset_id: Optional[UUID] = None) -> list[TranscodeJob]: - with get_session() as session: - stmt = select(TranscodeJob) - if status: - stmt = stmt.where(TranscodeJob.status == status) - if source_asset_id: - stmt = stmt.where(TranscodeJob.source_asset_id == source_asset_id) - return list(session.exec(stmt).all()) - - -def get_job(id: UUID) -> TranscodeJob | None: - with get_session() as session: - return session.get(TranscodeJob, id) - - -def create_job(**fields) -> TranscodeJob: - job = TranscodeJob(**fields) - with get_session() as session: - session.add(job) - session.commit() - session.refresh(job) - return job - - -def update_job(id: UUID, **fields) -> None: - with get_session() as session: - job = session.get(TranscodeJob, id) - if not job: - return - for k, v in fields.items(): - setattr(job, k, v) - session.commit() - - -def update_job_fields(job_id: UUID, **fields) -> None: - update_job(job_id, **fields) diff --git a/core/db/models.py b/core/db/models.py deleted file mode 100644 index 452bbfd..0000000 --- a/core/db/models.py +++ /dev/null @@ -1,238 +0,0 @@ -""" -SQLModel Table Models - GENERATED FILE - -Do not edit directly. Regenerate using modelgen. -""" - -from datetime import datetime -from enum import Enum -from typing import Any, Dict, List, Optional -from uuid import UUID, uuid4 - -from sqlmodel import SQLModel, Field, Column -from sqlalchemy import JSON - -class AssetStatus(str, Enum): - PENDING = "pending" - READY = "ready" - ERROR = "error" - -class JobStatus(str, Enum): - PENDING = "pending" - PROCESSING = "processing" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - -class ChunkJobStatus(str, Enum): - PENDING = "pending" - CHUNKING = "chunking" - PROCESSING = "processing" - COLLECTING = "collecting" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - -class DetectJobStatus(str, Enum): - PENDING = "pending" - RUNNING = "running" - PAUSED = "paused" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - -class RunType(str, Enum): - INITIAL = "initial" - REPLAY = "replay" - RETRY = "retry" - -class BrandSource(str, Enum): - OCR = "ocr" - VLM = "local_vlm" - CLOUD = "cloud_llm" - MANUAL = "manual" - -class SourceType(str, Enum): - CHUNK_JOB = "chunk_job" - UPLOAD = "upload" - DEVICE = "device" - STREAM = "stream" - -class MediaAsset(SQLModel, table=True): - """A video/audio file registered in the system.""" - __tablename__ = "media_assets" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - filename: str - file_path: str - status: AssetStatus = "pending" - error_message: Optional[str] = None - file_size: Optional[int] = None - duration: Optional[float] = None - video_codec: Optional[str] = None - audio_codec: Optional[str] = None - width: Optional[int] = None - height: Optional[int] = None - framerate: Optional[float] = None - bitrate: Optional[int] = None - properties: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - comments: str = "" - tags: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - updated_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - -class TranscodePreset(SQLModel, table=True): - """A reusable transcoding configuration (like Handbrake presets).""" - __tablename__ = "transcode_presets" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - name: str - description: str = "" - is_builtin: bool = False - container: str = "mp4" - video_codec: str = "libx264" - video_bitrate: Optional[str] = None - video_crf: Optional[int] = None - video_preset: Optional[str] = None - resolution: Optional[str] = None - framerate: Optional[float] = None - audio_codec: str = "aac" - audio_bitrate: Optional[str] = None - audio_channels: Optional[int] = None - audio_samplerate: Optional[int] = None - extra_args: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - updated_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - -class TranscodeJob(SQLModel, table=True): - """A transcoding or trimming job in the queue.""" - __tablename__ = "transcode_jobs" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - source_asset_id: UUID = Field(index=True) - preset_id: Optional[UUID] = None - preset_snapshot: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - trim_start: Optional[float] = None - trim_end: Optional[float] = None - output_filename: str = "" - output_path: Optional[str] = None - output_asset_id: Optional[UUID] = None - status: JobStatus = "pending" - progress: float = 0.0 - current_frame: Optional[int] = None - current_time: Optional[float] = None - speed: Optional[str] = None - error_message: Optional[str] = None - celery_task_id: Optional[str] = None - execution_arn: Optional[str] = None - priority: int = 0 - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - -class ChunkJob(SQLModel, table=True): - """A chunk pipeline job — splits a media file into chunks and processes them""" - __tablename__ = "chunk_jobs" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - source_asset_id: UUID = Field(index=True) - chunk_duration: float = 10.0 - num_workers: int = 4 - max_retries: int = 3 - processor_type: str = "ffmpeg" - status: ChunkJobStatus = "pending" - progress: float = 0.0 - total_chunks: int = 0 - processed_chunks: int = 0 - failed_chunks: int = 0 - retry_count: int = 0 - error_message: Optional[str] = None - throughput_mbps: Optional[float] = None - elapsed_seconds: Optional[float] = None - celery_task_id: Optional[str] = None - priority: int = 0 - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - -class DetectJob(SQLModel, table=True): - """A detection pipeline job.""" - __tablename__ = "detect_jobs" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - source_asset_id: UUID = Field(index=True) - video_path: str - profile_name: str = "soccer_broadcast" - parent_job_id: Optional[UUID] = Field(default=None, index=True) - run_type: RunType = "initial" - replay_from_stage: Optional[str] = None - config_overrides: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - status: DetectJobStatus = "pending" - current_stage: Optional[str] = None - progress: float = 0.0 - error_message: Optional[str] = None - total_detections: int = 0 - brands_found: int = 0 - cloud_llm_calls: int = 0 - estimated_cost_usd: float = 0.0 - celery_task_id: Optional[str] = None - priority: int = 0 - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - -class Timeline(SQLModel, table=True): - """Frame sequence from a source video. Independent of stages.""" - __tablename__ = "timelines" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - source_asset_id: Optional[UUID] = Field(default=None, index=True) - source_video: str = "" - profile_name: str = "" - fps: float = 2.0 - frames_prefix: str = "" - frames_manifest: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - frames_meta: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - -class Checkpoint(SQLModel, table=True): - """Snapshot of pipeline state. parent_id forms a tree.""" - __tablename__ = "checkpoints" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - timeline_id: UUID = Field(index=True) - parent_id: Optional[UUID] = Field(default=None, index=True) - stage_outputs: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - config_overrides: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - stats: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) - is_scenario: bool = False - scenario_label: str = "" - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - -class KnownBrand(SQLModel, table=True): - """A brand discovered or registered in the system.""" - __tablename__ = "known_brands" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - canonical_name: str = Field(index=True) - aliases: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) - first_source: BrandSource = "ocr" - total_occurrences: int = 0 - confirmed: bool = False - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - updated_at: Optional[datetime] = Field(default_factory=datetime.utcnow) - -class SourceBrandSighting(SQLModel, table=True): - """A brand seen in a specific source (video/asset).""" - __tablename__ = "source_brand_sightings" - - id: UUID = Field(default_factory=uuid4, primary_key=True) - source_asset_id: UUID = Field(index=True) - brand_id: UUID - brand_name: str - first_seen_timestamp: float = 0.0 - last_seen_timestamp: float = 0.0 - occurrences: int = 0 - detection_source: BrandSource = "ocr" - avg_confidence: float = 0.0 - created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) diff --git a/core/db/presets.py b/core/db/presets.py deleted file mode 100644 index 72dda4a..0000000 --- a/core/db/presets.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Database operations for TranscodePreset — SQLModel.""" - -from __future__ import annotations - -from uuid import UUID - -from sqlmodel import select - -from .connection import get_session -from .models import TranscodePreset - - -def list_presets() -> list[TranscodePreset]: - with get_session() as session: - return list(session.exec(select(TranscodePreset)).all()) - - -def get_preset(id: UUID) -> TranscodePreset | None: - with get_session() as session: - return session.get(TranscodePreset, id) diff --git a/core/db/tables.py b/core/db/tables.py new file mode 100644 index 0000000..d1d8489 --- /dev/null +++ b/core/db/tables.py @@ -0,0 +1,97 @@ +""" +SQLModel table definitions. + +Generated by modelgen from core/schema/models/. Do not edit directly. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional +from uuid import UUID, uuid4 + +from sqlalchemy import JSON +from sqlmodel import Column, Field, SQLModel + + +class MediaAsset(SQLModel, table=True): + __tablename__ = "media_asset" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + filename: str + path: str + status: str = "pending" + size_bytes: int = 0 + duration_seconds: float = 0.0 + width: Optional[int] = None + height: Optional[int] = None + fps: Optional[float] = None + codec: Optional[str] = None + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + + +class Job(SQLModel, table=True): + __tablename__ = "job" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + source_asset_id: UUID = Field(index=True) + video_path: str + profile_name: str = "soccer_broadcast" + parent_id: Optional[UUID] = Field(default=None, index=True) + run_type: str = "initial" + config_overrides: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + status: str = "pending" + current_stage: Optional[str] = None + progress: float = 0.0 + error_message: Optional[str] = None + total_detections: int = 0 + brands_found: int = 0 + cloud_llm_calls: int = 0 + estimated_cost_usd: float = 0.0 + celery_task_id: Optional[str] = None + priority: int = 0 + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +class Timeline(SQLModel, table=True): + __tablename__ = "timeline" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + source_asset_id: Optional[UUID] = Field(default=None, index=True) + source_video: str = "" + profile_name: str = "" + fps: float = 2.0 + frames_prefix: str = "" + frames_manifest: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + frames_meta: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + + +class Checkpoint(SQLModel, table=True): + __tablename__ = "checkpoint" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + timeline_id: UUID = Field(index=True) + parent_id: Optional[UUID] = Field(default=None, index=True) + stage_outputs: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + config_overrides: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + stats: Dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON, nullable=False, server_default='{}')) + is_scenario: bool = False + scenario_label: str = "" + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + + +class Brand(SQLModel, table=True): + __tablename__ = "brand" + + id: UUID = Field(default_factory=uuid4, primary_key=True) + canonical_name: str = Field(index=True) + aliases: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) + source: str = "ocr" + confirmed: bool = False + airings: List[str] = Field(default_factory=list, sa_column=Column(JSON, nullable=False, server_default='[]')) + total_airings: int = 0 + created_at: Optional[datetime] = Field(default_factory=datetime.utcnow) + updated_at: Optional[datetime] = Field(default_factory=datetime.utcnow) diff --git a/core/schema/__init__.py b/core/schema/__init__.py index 68d7bf3..296a5e5 100644 --- a/core/schema/__init__.py +++ b/core/schema/__init__.py @@ -3,23 +3,18 @@ MPR Schema Definitions - Source of Truth This package defines the core data models as Python dataclasses. These definitions are used to generate: -- Django ORM models (mpr/media_assets/models.py) -- Pydantic schemas (api/schema/*.py) -- TypeScript types (ui/timeline/src/types.ts) -- Protobuf definitions (grpc/protos/worker.proto) - -Run `python schema/generate.py` to regenerate all targets. +- SQLModel models (core/db/models.py) +- Pydantic schemas (detect/sse_contract.py, gpu/models/inference_contract.py) +- TypeScript types (ui/) +- Protobuf definitions (core/rpc/protos/worker.proto) """ from .models import ( BUILTIN_PRESETS, - # For generator DATACLASSES, ENUMS, GRPC_MESSAGES, - # gRPC GRPC_SERVICE, - # Enums AssetStatus, CancelRequest, CancelResponse, @@ -27,32 +22,13 @@ from .models import ( JobRequest, JobResponse, JobStatus, - # Models MediaAsset, ProgressRequest, ProgressUpdate, - TranscodeJob, TranscodePreset, WorkerStatus, + Job, + Timeline, + Checkpoint, + Brand, ) - -__all__ = [ - "MediaAsset", - "TranscodePreset", - "TranscodeJob", - "AssetStatus", - "JobStatus", - "GRPC_SERVICE", - "JobRequest", - "JobResponse", - "ProgressRequest", - "ProgressUpdate", - "CancelRequest", - "CancelResponse", - "WorkerStatus", - "Empty", - "DATACLASSES", - "ENUMS", - "GRPC_MESSAGES", - "BUILTIN_PRESETS", -] diff --git a/core/schema/models/__init__.py b/core/schema/models/__init__.py index 4e873eb..2d96d1f 100644 --- a/core/schema/models/__init__.py +++ b/core/schema/models/__init__.py @@ -25,28 +25,25 @@ from .grpc import ( ProgressUpdate, WorkerStatus, ) -from .jobs import ChunkJob, ChunkJobStatus, JobStatus, TranscodeJob -from .detect_jobs import ( - DetectJob, DetectJobStatus, RunType, +from .job import ( + Job, JobStatus, RunType, Timeline, Checkpoint, - BrandSource, KnownBrand, SourceBrandSighting, + BrandSource, Brand, ) -from .stages import StageConfigField, StageIO, StageDefinition, STAGE_VIEWS from .media import AssetStatus, MediaAsset from .presets import BUILTIN_PRESETS, TranscodePreset from .detect import DETECT_VIEWS # noqa: F401 — discovered by modelgen generic loader from .inference import INFERENCE_VIEWS # noqa: F401 — GPU inference server API types from .ui_state import UI_STATE_VIEWS # noqa: F401 — UI store state types +from .stages import StageConfigField, StageIO, StageDefinition, STAGE_VIEWS # noqa: F401 from .views import ChunkEvent, ChunkOutputFile, PipelineStats, WorkerEvent from .sources import ChunkInfo, SourceJob, SourceType -# Core domain models - generates Django, SQLModel, TypeScript -DATACLASSES = [MediaAsset, TranscodePreset, TranscodeJob, ChunkJob, - DetectJob, Timeline, Checkpoint, - KnownBrand, SourceBrandSighting] +# Core domain models - generates SQLModel, TypeScript +DATACLASSES = [MediaAsset, TranscodePreset, + Job, Timeline, Checkpoint, Brand] -# API request/response models - generates TypeScript only (no Django) -# WorkerStatus from grpc.py is reused here +# API request/response models API_MODELS = [ CreateJobRequest, UpdateAssetRequest, @@ -58,14 +55,13 @@ API_MODELS = [ ChunkInfo, ] -# Status enums - included in generated code -ENUMS = [AssetStatus, JobStatus, ChunkJobStatus, DetectJobStatus, RunType, BrandSource, SourceType] +# Status enums +ENUMS = [AssetStatus, JobStatus, RunType, BrandSource, SourceType] -# View/event models - generates TypeScript for UI consumption +# View/event models VIEWS = [ChunkEvent, WorkerEvent, PipelineStats, ChunkOutputFile] - -# gRPC messages - generates Proto +# gRPC messages GRPC_MESSAGES = [ JobRequest, JobResponse, @@ -83,18 +79,27 @@ __all__ = [ # Models "MediaAsset", "TranscodePreset", - "TranscodeJob", - "ChunkJob", - # API Models + "Job", + "Timeline", + "Checkpoint", + "KnownBrand", + "SourceBrandSighting", + # Enums + "AssetStatus", + "JobStatus", + "RunType", + "BrandSource", + "SourceType", + # Stages + "StageConfigField", + "StageIO", + "StageDefinition", + # API "CreateJobRequest", "UpdateAssetRequest", "DeleteResult", "ScanResult", "SystemStatus", - # Enums - "AssetStatus", - "JobStatus", - "ChunkJobStatus", # gRPC "GRPC_SERVICE", "JobRequest", @@ -113,7 +118,6 @@ __all__ = [ "PipelineStats", "ChunkOutputFile", # Sources - "SourceType", "SourceJob", "ChunkInfo", # For generator diff --git a/core/schema/models/job.py b/core/schema/models/job.py new file mode 100644 index 0000000..a3b5391 --- /dev/null +++ b/core/schema/models/job.py @@ -0,0 +1,158 @@ +""" +Job, Timeline, and Checkpoint Schema Definitions + +Source of truth for pipeline jobs, timelines, and checkpoints. +Generates: SQLModel (core/db/models.py), TypeScript via modelgen. +""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional +from uuid import UUID + + +class JobStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + PAUSED = "paused" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class RunType(str, Enum): + INITIAL = "initial" + REPLAY = "replay" + RETRY = "retry" + + +@dataclass +class Job: + """ + A pipeline job. + + Each invocation (initial run, replay, retry) creates a Job. + Jobs for the same source are linked via parent_id. + """ + + id: UUID + + # Input + source_asset_id: UUID + video_path: str + profile_name: str = "soccer_broadcast" + + # Lineage + parent_id: Optional[UUID] = None + run_type: RunType = RunType.INITIAL + config_overrides: Dict[str, Any] = field(default_factory=dict) + + # Status + status: JobStatus = JobStatus.PENDING + current_stage: Optional[str] = None + progress: float = 0.0 + error_message: Optional[str] = None + + # Results summary + total_detections: int = 0 + brands_found: int = 0 + cloud_llm_calls: int = 0 + estimated_cost_usd: float = 0.0 + + # Worker tracking + celery_task_id: Optional[str] = None + priority: int = 0 + + # Timestamps + created_at: Optional[datetime] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +@dataclass +class Timeline: + """ + The frame sequence from a source video. + + Independent of stages — exists before any stage runs. + Frames stored in MinIO as JPEGs, metadata here. + One timeline per job. + """ + + id: UUID + source_asset_id: Optional[UUID] = None + source_video: str = "" + profile_name: str = "" + fps: float = 2.0 + + frames_prefix: str = "" # s3: timeline/{id}/frames/ + frames_manifest: Dict[int, str] = field(default_factory=dict) # seq → s3 key + frames_meta: List[Dict[str, Any]] = field(default_factory=list) + + created_at: Optional[datetime] = None + + +@dataclass +class Checkpoint: + """ + A snapshot of pipeline state on a timeline. + + Stage outputs stored as JSONB — each stage serializes to JSON, + the checkpoint stores it without knowing the shape. + + parent_id forms a tree: multiple children from the same parent + = different config tries from the same starting point. + """ + + id: UUID + timeline_id: UUID + parent_id: Optional[UUID] = None # null = root checkpoint + + # Stage outputs — JSONB per stage, opaque to the checkpoint layer + stage_outputs: Dict[str, Any] = field(default_factory=dict) + + # Config that produced this checkpoint + config_overrides: Dict[str, Any] = field(default_factory=dict) + + # Pipeline state + stats: Dict[str, Any] = field(default_factory=dict) + + # Scenario bookmark + is_scenario: bool = False + scenario_label: str = "" + + created_at: Optional[datetime] = None + + +# --- Brands --- + +class BrandSource(str, Enum): + OCR = "ocr" + VLM = "local_vlm" + CLOUD = "cloud_llm" + MANUAL = "manual" + + +@dataclass +class Brand: + """ + A brand discovered or registered in the system. + + Airings track where/when the brand appeared — each airing + references a timeline and a frame range. + """ + + id: UUID + canonical_name: str + aliases: List[str] = field(default_factory=list) + source: BrandSource = BrandSource.OCR # how first discovered + confirmed: bool = False + + # Airings — JSONB array of appearances + # [{timeline_id, frame_start, frame_end, confidence, source, timestamp}] + airings: List[Dict[str, Any]] = field(default_factory=list) + total_airings: int = 0 + + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None