16 KiB
sign_pdfs_optimized — Walkthrough
Production-refined fork of
sign_pdfs. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester.
The original sign_pdfs is the "before" snapshot — it is left untouched. This file (sign_pdfs_optimized/handler.py) is the "after": same contract, same output shape, all the rough edges from the original fixed.
Block 1 — Imports and sentinel
import asyncio, json, os, time, uuid
import aioboto3
import aiofiles
_DONE = object()
Nothing new at the import level. _DONE is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than None or a string) means no PDF key will ever accidentally match it.
Original issue fixed here: the original put a single _DONE on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's finally block (Block 4).
Block 2 — Config from event (_cfg)
def _cfg(event: dict) -> dict:
return {
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
"page_size": int(event.get("page_size") or 1000),
"concurrency": int(event.get("concurrency") or 4),
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
}
Improvement 1 — event-driven config. The original hard-coded bucket, prefix, and expiry as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve 2026/04/, 2026/05/, or an entirely different bucket by changing only the event payload — no redeployment.
endpoint is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request.
queue_max bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers.
Block 3 — Structured logging (_log) and EMF metrics (_emit_emf)
def _log(event_type: str, **fields):
print(json.dumps({"event": event_type, **fields}))
def _emit_emf(metrics: dict, **dims):
print(json.dumps({
"_aws": {
"Timestamp": int(time.time() * 1000),
"CloudWatchMetrics": [{
"Namespace": "eth/sign_pdfs_optimized",
"Dimensions": [list(dims.keys())],
"Metrics": [
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
for k in metrics
],
}],
},
**dims, **metrics,
}))
Improvement 2 — structured JSON logging. The original printed nothing. _log emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly:
filter event = "complete" | stats avg(duration_ms) by bin(5m)
The local runner's _extract_json_logs picks up these lines and surfaces them in the tester's "Structured logs" panel.
Improvement 3 — EMF metrics. _emit_emf writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's _extract_emf_metrics) parses this and publishes real CloudWatch metrics — no PutMetricData API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in Bytes → "Bytes", everything else → "Count".
EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation.
Block 4 — Producer with try/finally
async def producer():
nonlocal pages
try:
paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
PaginationConfig={"PageSize": cfg["page_size"]},
):
pages += 1
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
await queue.put(key)
finally:
for _ in range(cfg["concurrency"]):
await queue.put(_DONE)
Improvement 4 — guaranteed consumer exit. The finally block runs whether the producer succeeds, raises a ClientError, or is cancelled. Each of the N consumers gets its own _DONE sentinel. Without finally, an exception in the producer leaves all consumers blocked on queue.get() forever — the handler hangs and eventually times out at 15 minutes.
Improvement 7 — PageSize=1000. S3's ListObjectsV2 returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes.
Block 5 — Consumer with per-item error handling
async def consumer():
nonlocal errors
local = 0
while True:
item = await queue.get()
if item is _DONE:
return local
try:
url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": item},
ExpiresIn=cfg["expiry"],
)
line = json.dumps({"key": item, "url": url}) + "\n"
async with write_lock:
await f.write(line)
local += 1
except Exception as exc:
errors += 1
_log("presign_error", request_id=request_id, key=item, error=str(exc))
Improvement 5 — per-item error isolation. The original let a single generate_presigned_url failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own try/except. A bad key increments errors and logs the details; the consumer continues with the next key. The caller receives {"count": N, "errors": M} and can decide whether to retry the M failures.
local counts successful presigns per consumer. The N return values are summed after asyncio.gather to get the total count.
The write_lock serialises writes to the shared JSONL file — without it, concurrent await f.write(line) calls would interleave partial lines.
Block 6 — N concurrent consumers (asyncio.gather)
prod_task = asyncio.create_task(producer())
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
await prod_task
Improvement 8 — concurrent consumers. The original ran one consumer. Here asyncio.gather starts N consumer coroutines simultaneously, all reading from the same queue. Since generate_presigned_url is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with concurrency up to the point where the producer can't keep the queue full.
asyncio.create_task(producer()) schedules the producer to run concurrently with the consumers rather than sequentially before them. The await prod_task after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task).
Block 7 — Streaming manifest upload
manifest_bytes = os.path.getsize(manifest_path)
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
with open(manifest_path, "rb") as f:
await s3.put_object(
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
ContentType="application/x-ndjson",
)
Improvement 6 — streaming upload. The original read the entire manifest into memory with await f.read() before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full read() doubles peak RAM usage. Passing a sync file handle as Body lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size.
os.path.getsize is called before opening the file for upload so the size is available for the EMF block even after the file is deleted.
Block 8 — Cleanup, metrics, and response
os.unlink(manifest_path)
result = {
"count": count,
"errors": errors,
"manifest_key": manifest_key,
"manifest_url": manifest_url,
}
response_bytes = len(json.dumps(result))
duration_ms = (time.monotonic() - t0) * 1000
_log("complete", request_id=request_id, count=count, errors=errors,
pages=pages, duration_ms=round(duration_ms, 2))
_emit_emf({
"PDFsProcessed": count, "S3ListPages": pages,
"PresignCount": count, "ManifestBytes": manifest_bytes,
"ResponseBytes": response_bytes,
}, Function="sign_pdfs_optimized")
return result
os.unlink removes the temp file after upload. /tmp is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) /tmp cap.
response_bytes = len(json.dumps(result)) captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so response_bytes is typically under 300 bytes — but measuring it makes the headroom concrete.
The final _log("complete", ...) and _emit_emf(...) emit together at the end so all counts are final. In the tester, the Structured logs panel shows the complete event and the EMF panel shows the five metric values.
Block 9 — Entrypoint
def handler(event, context):
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
result = asyncio.run(_run(event, request_id))
return {"statusCode": 200, "body": json.dumps(result)}
asyncio.run creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; asyncio.run is the correct bridge between the sync entrypoint and the async internals. A module-level loop = asyncio.get_event_loop() would work on the first invocation but is deprecated and unreliable across warm invocations.
getattr(context, "aws_request_id", ...) falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub.
Summary of improvements
| # | What changed | Why it matters |
|---|---|---|
| 1 | Config from event, env as fallback | One function serves many prefixes |
| 2 | Structured JSON logging (_log) |
Queryable in Logs Insights; visible in tester |
| 3 | EMF metrics (_emit_emf) |
Free CloudWatch metrics, no API call overhead |
| 4 | Producer try/finally with N sentinels |
Consumers always exit; handler never hangs |
| 5 | Per-item try/except in consumer |
One bad key doesn't crash the batch |
| 6 | Sync file handle as Body |
Flat memory for any manifest size |
| 7 | PageSize=1000 |
Fewer S3 round-trips on large prefixes |
| 8 | N concurrent consumers via asyncio.gather |
Presign throughput scales with concurrency |
Further improvements (not yet applied)
These are the next natural steps if this function were going to production. They were left out intentionally — each adds infrastructure or AWS-side configuration that goes beyond the handler itself.
Idempotency
The function is not idempotent in the strict sense. Each invocation with the same (bucket, prefix) event produces a new manifest at a new UUID key. If Lambda retries the invocation (async invocations retry up to 2 times by default, and S3/SNS/EventBridge are at-least-once), you accumulate duplicate manifests in the manifests/ prefix.
The standard fix is a DynamoDB dedup table:
import hashlib, boto3 as _boto3
_ddb = _boto3.resource("dynamodb")
_table = _ddb.Table(os.environ["DEDUP_TABLE"])
def _dedup_key(cfg: dict) -> str:
raw = f"{cfg['bucket']}#{cfg['prefix']}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
# at the top of _run(), before any S3 work:
dedup_key = _dedup_key(cfg)
resp = _table.get_item(Key={"id": dedup_key})
if "Item" in resp:
return json.loads(resp["Item"]["result"]) # cached — skip all S3 work
# ... do the work ...
# at the end, before returning:
_table.put_item(Item={
"id": dedup_key,
"result": json.dumps(result),
"ttl": int(time.time()) + 86400,
})
The dedup key is derived from the logical job identity (bucket + prefix), not the request_id. Using request_id would only guard against Lambda's own retries of the same invocation; using the business key guards against a caller submitting the same job twice.
AWS PowerTools for Lambda has a built-in @idempotent decorator that implements this exact pattern, including TTL management and in-progress locking.
What it requires: a DynamoDB table, dynamodb:GetItem + dynamodb:PutItem IAM permissions on the execution role, and the PowerTools layer (or aws-lambda-powertools in requirements).
Manifest lifecycle rule
Every invocation writes a new object under manifests/. Without cleanup, this prefix grows unbounded. The fix is an S3 lifecycle rule on the bucket — not a handler change:
{
"Rules": [{
"ID": "expire-manifests",
"Filter": { "Prefix": "manifests/" },
"Status": "Enabled",
"Expiration": { "Days": 1 }
}]
}
Objects under manifests/ are deleted by S3 automatically after 1 day. The presigned URLs in those manifests are already short-lived (15 minutes by default), so there's no reason to keep the manifest longer than the URL validity window.
What it requires: a PutBucketLifecycleConfiguration call during infrastructure provisioning (CDK/Terraform/console) — nothing in the handler.
Idempotency + manifest lifecycle together
With both in place: a retry returns the same manifest_url pointing to the same (still-live) manifest object; after 24 hours the manifest is gone and the dedup record has expired, so the next invocation starts fresh. The combination is clean.
ReportBatchItemFailures (SQS only)
If this function were triggered by an SQS event source mapping (one message = one (bucket, prefix) job), the consumer-level errors field isn't enough — Lambda needs to know which SQS messages failed so it can re-queue only those. Return a batchItemFailures list instead of raising:
def handler(event, context):
failures = []
for record in event["Records"]:
body = json.loads(record["body"])
try:
result = asyncio.run(_run(body, context.aws_request_id))
except Exception as exc:
failures.append({"itemIdentifier": record["messageId"]})
return {"batchItemFailures": failures}
Without this, a single failed message causes the entire batch to retry, including messages that succeeded — work is repeated and the queue can stall on a poison-pill message indefinitely.
What it requires: ReportBatchItemFailures enabled on the ESM configuration (CDK/Terraform) and restructuring the handler to iterate over event["Records"]. Not applicable to direct (RequestResponse) invocations like the local tester uses.
arm64 / Graviton2
No code change needed. Switch the function's architecture to arm64 in the deployment config:
# SAM
Architectures: [arm64]
Graviton2 costs ~20% less per GB-second and typically runs the init phase ~10% faster. The only blocker is native-code wheels: aiobotocore ships pure Python so there's no binary incompatibility here. Worth doing as a zero-effort cost and latency win.