Files

82 lines
2.6 KiB
Python

import asyncio
import json
import os
import uuid
import aioboto3
import aiofiles
BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket")
PREFIX = os.environ.get("PREFIX", "2026/04/")
EXPIRY = int(os.environ.get("URL_EXPIRY_SECONDS", "900"))
ENDPOINT = os.environ.get("S3_ENDPOINT_URL") or None
QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "2000"))
_DONE = object()
async def _run():
session = aioboto3.Session()
async with session.client("s3", endpoint_url=ENDPOINT) as s3:
queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX)
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
async def producer():
paginator = s3.get_paginator("list_objects_v2")
try:
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
await queue.put(key)
finally:
await queue.put(_DONE)
async def consumer():
count = 0
async with aiofiles.open(manifest_path, "w") as f:
while True:
item = await queue.get()
if item is _DONE:
break
url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": BUCKET, "Key": item},
ExpiresIn=EXPIRY,
)
await f.write(json.dumps({"key": item, "url": url}) + "\n")
count += 1
return count
prod_task = asyncio.create_task(producer())
count = await consumer()
await prod_task
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
async with aiofiles.open(manifest_path, "rb") as f:
body = await f.read()
await s3.put_object(
Bucket=BUCKET,
Key=manifest_key,
Body=body,
ContentType="application/x-ndjson",
)
manifest_url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": BUCKET, "Key": manifest_key},
ExpiresIn=EXPIRY,
)
os.unlink(manifest_path)
return {
"count": count,
"manifest_key": manifest_key,
"manifest_url": manifest_url,
}
def handler(event, context):
result = asyncio.run(_run())
return {"statusCode": 200, "body": json.dumps(result)}