""" S3 storage layer. Uses MinIO locally (S3-compatible) and real AWS S3 in production. The only difference is S3_ENDPOINT_URL: set for MinIO, omit for AWS. """ import os import tempfile from pathlib import Path from typing import Optional import boto3 from botocore.config import Config BUCKET_IN = os.environ.get("S3_BUCKET_IN", "mpr-media-in") BUCKET_OUT = os.environ.get("S3_BUCKET_OUT", "mpr-media-out") def get_s3_client(): """Get a boto3 S3 client. Works with both MinIO and real AWS S3.""" kwargs = { "region_name": os.environ.get("AWS_REGION", "us-east-1"), "config": Config(signature_version="s3v4"), } endpoint = os.environ.get("S3_ENDPOINT_URL") if endpoint: kwargs["endpoint_url"] = endpoint kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID", "minioadmin") kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY", "minioadmin") return boto3.client("s3", **kwargs) def list_objects(bucket: str, prefix: str = "", extensions: Optional[set] = None) -> list[dict]: """List objects in an S3 bucket, optionally filtered by file extension.""" s3 = get_s3_client() objects = [] kwargs = {"Bucket": bucket, "Prefix": prefix} while True: response = s3.list_objects_v2(**kwargs) for obj in response.get("Contents", []): key = obj["Key"] if extensions: ext = Path(key).suffix.lower() if ext not in extensions: continue objects.append({ "key": key, "size": obj["Size"], "filename": Path(key).name, }) if not response.get("IsTruncated"): break kwargs["ContinuationToken"] = response["NextContinuationToken"] return objects def download_file(bucket: str, key: str, local_path: str) -> str: """Download a file from S3 to a local path.""" s3 = get_s3_client() Path(local_path).parent.mkdir(parents=True, exist_ok=True) s3.download_file(bucket, key, local_path) return local_path def download_to_temp(bucket: str, key: str) -> str: """Download a file from S3 to a temp file. Caller must clean up.""" ext = Path(key).suffix fd, tmp_path = tempfile.mkstemp(suffix=ext) os.close(fd) download_file(bucket, key, tmp_path) return tmp_path def upload_file(local_path: str, bucket: str, key: str) -> None: """Upload a local file to S3.""" s3 = get_s3_client() s3.upload_file(local_path, bucket, key) def get_presigned_url(bucket: str, key: str, expires: int = 3600) -> str: """Generate a presigned URL for an S3 object.""" s3 = get_s3_client() return s3.generate_presigned_url( "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires, )