272 lines
10 KiB
Python
272 lines
10 KiB
Python
"""sign_pdfs — production handler.
|
|
|
|
Generates 15-minute presigned S3 URLs for every PDF under a bucket+prefix,
|
|
writes a JSONL manifest to S3, returns the manifest URL.
|
|
|
|
This is the canonical version. functions/sign_pdfs_v1/ keeps the original
|
|
naive implementation around for the local "before/after" comparison in the
|
|
tester UI; only this one gets deployed to AWS.
|
|
|
|
Production refinements vs sign_pdfs_v1 (see docs/lambdas-md/lambda-20-sign-pdfs.md):
|
|
|
|
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
|
|
env vars are fallback only. Same function can serve many prefixes.
|
|
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
|
|
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
|
|
ManifestBytes, ResponseBytes.
|
|
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
|
|
per consumer, not a single _DONE).
|
|
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
|
|
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
|
|
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
|
|
8. N concurrent consumers via asyncio.gather — presign throughput scales with
|
|
the concurrency knob in the event.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
import uuid
|
|
|
|
import aioboto3
|
|
import aiofiles
|
|
|
|
_DONE = object()
|
|
|
|
|
|
def _dedup_key(cfg: dict) -> str:
|
|
# Identity of a logical job: same (bucket, prefix) = same job. Hashing keeps
|
|
# the DDB key bounded regardless of prefix length and avoids leaking the
|
|
# bucket/prefix into the dedup key in cleartext.
|
|
raw = f"{cfg['bucket']}#{cfg['prefix']}"
|
|
return hashlib.sha256(raw.encode()).hexdigest()[:32]
|
|
|
|
|
|
def _cfg(event: dict) -> dict:
|
|
# bucket and prefix are required — fail loud if neither event nor env supplies
|
|
# them. The function deliberately has no fallback default (no "2026/04/" baked
|
|
# in) because that's a deployment-time concern, not handler code.
|
|
return {
|
|
"bucket": event.get("bucket") or os.environ["BUCKET_NAME"],
|
|
"prefix": event.get("prefix") or os.environ["PREFIX"], # e.g. "2026/04/"
|
|
"expiry": int(
|
|
event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")
|
|
),
|
|
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
|
"page_size": int(event.get("page_size") or 100),
|
|
"concurrency": int(event.get("concurrency") or 4),
|
|
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
|
}
|
|
|
|
|
|
def _log(event_type: str, **fields):
|
|
print(json.dumps({"event": event_type, **fields}))
|
|
|
|
|
|
def _emit_emf(metrics: dict, **dims):
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"_aws": {
|
|
"Timestamp": int(time.time() * 1000),
|
|
"CloudWatchMetrics": [
|
|
{
|
|
"Namespace": "eth/sign_pdfs",
|
|
"Dimensions": [list(dims.keys())],
|
|
"Metrics": [
|
|
{
|
|
"Name": k,
|
|
"Unit": "Bytes" if k.endswith("Bytes") else "Count",
|
|
}
|
|
for k in metrics
|
|
],
|
|
}
|
|
],
|
|
},
|
|
**dims,
|
|
**metrics,
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
async def _run(event: dict, request_id: str):
|
|
cfg = _cfg(event)
|
|
_log(
|
|
"start",
|
|
request_id=request_id,
|
|
bucket=cfg["bucket"],
|
|
prefix=cfg["prefix"],
|
|
page_size=cfg["page_size"],
|
|
concurrency=cfg["concurrency"],
|
|
)
|
|
t0 = time.monotonic()
|
|
|
|
session = aioboto3.Session()
|
|
dedup_table = os.environ.get("DEDUP_TABLE")
|
|
dedup_key = _dedup_key(cfg)
|
|
|
|
# Idempotency check — return the cached result if (bucket, prefix) was
|
|
# processed within the TTL window. Regenerate the manifest URL so the
|
|
# cached response is always usable, even after the original URL expired.
|
|
if dedup_table:
|
|
async with session.resource("dynamodb") as ddb:
|
|
table = await ddb.Table(dedup_table)
|
|
try:
|
|
resp = await table.get_item(Key={"id": dedup_key})
|
|
except Exception as exc:
|
|
_log("dedup_lookup_error", request_id=request_id, error=str(exc))
|
|
resp = {}
|
|
if "Item" in resp:
|
|
cached = json.loads(resp["Item"]["result"])
|
|
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
|
|
cached["manifest_url"] = await s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": cfg["bucket"], "Key": cached["manifest_key"]},
|
|
ExpiresIn=cfg["expiry"],
|
|
)
|
|
cached["idempotent"] = True
|
|
_log(
|
|
"cache_hit",
|
|
request_id=request_id,
|
|
dedup_key=dedup_key,
|
|
count=cached["count"],
|
|
duration_ms=round((time.monotonic() - t0) * 1000, 2),
|
|
)
|
|
return cached
|
|
|
|
pages = 0
|
|
errors = 0
|
|
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
|
|
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
|
|
|
|
async def producer():
|
|
nonlocal pages
|
|
try:
|
|
paginator = s3.get_paginator("list_objects_v2")
|
|
async for page in paginator.paginate(
|
|
Bucket=cfg["bucket"],
|
|
Prefix=cfg["prefix"],
|
|
PaginationConfig={"PageSize": cfg["page_size"]},
|
|
):
|
|
pages += 1
|
|
for obj in page.get("Contents", []) or []:
|
|
key = obj["Key"]
|
|
if key.lower().endswith(".pdf"):
|
|
await queue.put(key)
|
|
finally:
|
|
# one sentinel per consumer so each gather() task exits cleanly
|
|
for _ in range(cfg["concurrency"]):
|
|
await queue.put(_DONE)
|
|
|
|
write_lock = asyncio.Lock()
|
|
async with aiofiles.open(manifest_path, "w") as f:
|
|
|
|
async def consumer():
|
|
nonlocal errors
|
|
local = 0
|
|
while True:
|
|
item = await queue.get()
|
|
if item is _DONE:
|
|
return local
|
|
try:
|
|
url = await s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": cfg["bucket"], "Key": item},
|
|
ExpiresIn=cfg["expiry"],
|
|
)
|
|
line = json.dumps({"key": item, "url": url}) + "\n"
|
|
async with write_lock:
|
|
await f.write(line)
|
|
local += 1
|
|
except Exception as exc:
|
|
errors += 1
|
|
_log(
|
|
"presign_error",
|
|
request_id=request_id,
|
|
key=item,
|
|
error=str(exc),
|
|
)
|
|
|
|
prod_task = asyncio.create_task(producer())
|
|
counts = await asyncio.gather(
|
|
*(consumer() for _ in range(cfg["concurrency"]))
|
|
)
|
|
await prod_task
|
|
|
|
count = sum(counts)
|
|
manifest_bytes = os.path.getsize(manifest_path)
|
|
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
|
|
|
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
|
|
# the whole manifest in memory like `body = await f.read()` would.
|
|
with open(manifest_path, "rb") as f:
|
|
await s3.put_object(
|
|
Bucket=cfg["bucket"],
|
|
Key=manifest_key,
|
|
Body=f,
|
|
ContentType="application/x-ndjson",
|
|
)
|
|
|
|
manifest_url = await s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
|
|
ExpiresIn=cfg["expiry"],
|
|
)
|
|
os.unlink(manifest_path)
|
|
|
|
result = {
|
|
"count": count,
|
|
"errors": errors,
|
|
"manifest_key": manifest_key,
|
|
"manifest_url": manifest_url,
|
|
}
|
|
response_bytes = len(json.dumps(result))
|
|
duration_ms = (time.monotonic() - t0) * 1000
|
|
|
|
_log(
|
|
"complete",
|
|
request_id=request_id,
|
|
count=count,
|
|
errors=errors,
|
|
pages=pages,
|
|
duration_ms=round(duration_ms, 2),
|
|
)
|
|
_emit_emf(
|
|
{
|
|
"PDFsProcessed": count,
|
|
"S3ListPages": pages,
|
|
"PresignCount": count,
|
|
"ManifestBytes": manifest_bytes,
|
|
"ResponseBytes": response_bytes,
|
|
},
|
|
Function="sign_pdfs",
|
|
)
|
|
|
|
if dedup_table:
|
|
async with session.resource("dynamodb") as ddb:
|
|
table = await ddb.Table(dedup_table)
|
|
try:
|
|
await table.put_item(
|
|
Item={
|
|
"id": dedup_key,
|
|
"result": json.dumps(result),
|
|
"ttl": int(time.time()) + 86400, # 24h
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
# cache write failure is non-critical — the actual work
|
|
# succeeded. Log it so we know if dedup is broken.
|
|
_log("dedup_write_error", request_id=request_id, error=str(exc))
|
|
|
|
return result
|
|
|
|
|
|
def handler(event, context):
|
|
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
|
result = asyncio.run(_run(event, request_id))
|
|
return {"statusCode": 200, "body": json.dumps(result)}
|