Files
lambda_studio/functions/sign_pdfs/handler.py

215 lines
7.5 KiB
Python

"""sign_pdfs — production handler.
Generates 15-minute presigned S3 URLs for every PDF under a bucket+prefix,
writes a JSONL manifest to S3, returns the manifest URL.
This is the canonical version. functions/sign_pdfs_v1/ keeps the original
naive implementation around for the local "before/after" comparison in the
tester UI; only this one gets deployed to AWS.
Production refinements vs sign_pdfs_v1 (see docs/lambdas-md/lambda-20-sign-pdfs.md):
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
env vars are fallback only. Same function can serve many prefixes.
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
ManifestBytes, ResponseBytes.
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
per consumer, not a single _DONE).
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
8. N concurrent consumers via asyncio.gather — presign throughput scales with
the concurrency knob in the event.
"""
import asyncio
import json
import os
import time
import uuid
import aioboto3
import aiofiles
_DONE = object()
def _cfg(event: dict) -> dict:
# bucket and prefix are required — fail loud if neither event nor env supplies
# them. The function deliberately has no fallback default (no "2026/04/" baked
# in) because that's a deployment-time concern, not handler code.
return {
"bucket": event.get("bucket") or os.environ["BUCKET_NAME"],
"prefix": event.get("prefix") or os.environ["PREFIX"], # e.g. "2026/04/"
"expiry": int(
event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")
),
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
"page_size": int(event.get("page_size") or 100),
"concurrency": int(event.get("concurrency") or 4),
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
}
def _log(event_type: str, **fields):
print(json.dumps({"event": event_type, **fields}))
def _emit_emf(metrics: dict, **dims):
print(
json.dumps(
{
"_aws": {
"Timestamp": int(time.time() * 1000),
"CloudWatchMetrics": [
{
"Namespace": "eth/sign_pdfs",
"Dimensions": [list(dims.keys())],
"Metrics": [
{
"Name": k,
"Unit": "Bytes" if k.endswith("Bytes") else "Count",
}
for k in metrics
],
}
],
},
**dims,
**metrics,
}
)
)
async def _run(event: dict, request_id: str):
cfg = _cfg(event)
_log(
"start",
request_id=request_id,
bucket=cfg["bucket"],
prefix=cfg["prefix"],
page_size=cfg["page_size"],
concurrency=cfg["concurrency"],
)
t0 = time.monotonic()
pages = 0
errors = 0
session = aioboto3.Session()
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
async def producer():
nonlocal pages
try:
paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(
Bucket=cfg["bucket"],
Prefix=cfg["prefix"],
PaginationConfig={"PageSize": cfg["page_size"]},
):
pages += 1
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
await queue.put(key)
finally:
# one sentinel per consumer so each gather() task exits cleanly
for _ in range(cfg["concurrency"]):
await queue.put(_DONE)
write_lock = asyncio.Lock()
async with aiofiles.open(manifest_path, "w") as f:
async def consumer():
nonlocal errors
local = 0
while True:
item = await queue.get()
if item is _DONE:
return local
try:
url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": item},
ExpiresIn=cfg["expiry"],
)
line = json.dumps({"key": item, "url": url}) + "\n"
async with write_lock:
await f.write(line)
local += 1
except Exception as exc:
errors += 1
_log(
"presign_error",
request_id=request_id,
key=item,
error=str(exc),
)
prod_task = asyncio.create_task(producer())
counts = await asyncio.gather(
*(consumer() for _ in range(cfg["concurrency"]))
)
await prod_task
count = sum(counts)
manifest_bytes = os.path.getsize(manifest_path)
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
# the whole manifest in memory like `body = await f.read()` would.
with open(manifest_path, "rb") as f:
await s3.put_object(
Bucket=cfg["bucket"],
Key=manifest_key,
Body=f,
ContentType="application/x-ndjson",
)
manifest_url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
ExpiresIn=cfg["expiry"],
)
os.unlink(manifest_path)
result = {
"count": count,
"errors": errors,
"manifest_key": manifest_key,
"manifest_url": manifest_url,
}
response_bytes = len(json.dumps(result))
duration_ms = (time.monotonic() - t0) * 1000
_log(
"complete",
request_id=request_id,
count=count,
errors=errors,
pages=pages,
duration_ms=round(duration_ms, 2),
)
_emit_emf(
{
"PDFsProcessed": count,
"S3ListPages": pages,
"PresignCount": count,
"ManifestBytes": manifest_bytes,
"ResponseBytes": response_bytes,
},
Function="sign_pdfs",
)
return result
def handler(event, context):
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
result = asyncio.run(_run(event, request_id))
return {"statusCode": 200, "body": json.dumps(result)}