import asyncio import json import os import uuid import aioboto3 import aiofiles BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket") PREFIX = os.environ.get("PREFIX", "2026/04/") EXPIRY = int(os.environ.get("URL_EXPIRY_SECONDS", "900")) ENDPOINT = os.environ.get("S3_ENDPOINT_URL") or None QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "2000")) _DONE = object() async def _run(): session = aioboto3.Session() async with session.client("s3", endpoint_url=ENDPOINT) as s3: queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX) manifest_path = f"/tmp/{uuid.uuid4()}.jsonl" async def producer(): paginator = s3.get_paginator("list_objects_v2") try: async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}): for obj in page.get("Contents", []) or []: key = obj["Key"] if key.lower().endswith(".pdf"): await queue.put(key) finally: await queue.put(_DONE) async def consumer(): count = 0 async with aiofiles.open(manifest_path, "w") as f: while True: item = await queue.get() if item is _DONE: break url = await s3.generate_presigned_url( "get_object", Params={"Bucket": BUCKET, "Key": item}, ExpiresIn=EXPIRY, ) await f.write(json.dumps({"key": item, "url": url}) + "\n") count += 1 return count prod_task = asyncio.create_task(producer()) count = await consumer() await prod_task manifest_key = f"manifests/{uuid.uuid4()}.jsonl" async with aiofiles.open(manifest_path, "rb") as f: body = await f.read() await s3.put_object( Bucket=BUCKET, Key=manifest_key, Body=body, ContentType="application/x-ndjson", ) manifest_url = await s3.generate_presigned_url( "get_object", Params={"Bucket": BUCKET, "Key": manifest_key}, ExpiresIn=EXPIRY, ) os.unlink(manifest_path) return { "count": count, "manifest_key": manifest_key, "manifest_url": manifest_url, } def handler(event, context): result = asyncio.run(_run()) return {"statusCode": 200, "body": json.dumps(result)}