80 lines
2.5 KiB
Python
80 lines
2.5 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import uuid
|
|
|
|
import aioboto3
|
|
import aiofiles
|
|
|
|
BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket")
|
|
PREFIX = os.environ.get("PREFIX", "2026/04/")
|
|
EXPIRY = int(os.environ.get("URL_EXPIRY_SECONDS", "900"))
|
|
ENDPOINT = os.environ.get("S3_ENDPOINT_URL") or None
|
|
QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "2000"))
|
|
|
|
_DONE = object()
|
|
|
|
|
|
async def _run():
|
|
session = aioboto3.Session()
|
|
async with session.client("s3", endpoint_url=ENDPOINT) as s3:
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX)
|
|
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
|
|
|
|
async def producer():
|
|
paginator = s3.get_paginator("list_objects_v2")
|
|
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX):
|
|
for obj in page.get("Contents", []) or []:
|
|
key = obj["Key"]
|
|
if key.lower().endswith(".pdf"):
|
|
await queue.put(key)
|
|
await queue.put(_DONE)
|
|
|
|
async def consumer():
|
|
count = 0
|
|
async with aiofiles.open(manifest_path, "w") as f:
|
|
while True:
|
|
item = await queue.get()
|
|
if item is _DONE:
|
|
break
|
|
url = await s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": BUCKET, "Key": item},
|
|
ExpiresIn=EXPIRY,
|
|
)
|
|
await f.write(json.dumps({"key": item, "url": url}) + "\n")
|
|
count += 1
|
|
return count
|
|
|
|
prod_task = asyncio.create_task(producer())
|
|
count = await consumer()
|
|
await prod_task
|
|
|
|
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
|
async with aiofiles.open(manifest_path, "rb") as f:
|
|
body = await f.read()
|
|
await s3.put_object(
|
|
Bucket=BUCKET,
|
|
Key=manifest_key,
|
|
Body=body,
|
|
ContentType="application/x-ndjson",
|
|
)
|
|
manifest_url = await s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": BUCKET, "Key": manifest_key},
|
|
ExpiresIn=EXPIRY,
|
|
)
|
|
|
|
os.unlink(manifest_path)
|
|
|
|
return {
|
|
"count": count,
|
|
"manifest_key": manifest_key,
|
|
"manifest_url": manifest_url,
|
|
}
|
|
|
|
|
|
def handler(event, context):
|
|
result = asyncio.run(_run())
|
|
return {"statusCode": 200, "body": json.dumps(result)}
|