Files
lambda_local_runner/docs/lambdas-md/lambda-20-sign-pdfs-optimized.md
2026-05-13 17:23:25 -03:00

11 KiB
Raw Blame History

sign_pdfs_optimized — Walkthrough

Production-refined fork of sign_pdfs. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester.

The original sign_pdfs is the "before" snapshot — it is left untouched. This file (sign_pdfs_optimized/handler.py) is the "after": same contract, same output shape, all the rough edges from the original fixed.


Block 1 — Imports and sentinel

import asyncio, json, os, time, uuid
import aioboto3
import aiofiles

_DONE = object()

Nothing new at the import level. _DONE is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than None or a string) means no PDF key will ever accidentally match it.

Original issue fixed here: the original put a single _DONE on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's finally block (Block 4).


Block 2 — Config from event (_cfg)

def _cfg(event: dict) -> dict:
    return {
        "bucket":      event.get("bucket")       or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
        "prefix":      event.get("prefix")        or os.environ.get("PREFIX", "2026/04/"),
        "expiry":      int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
        "endpoint":    os.environ.get("S3_ENDPOINT_URL") or None,
        "page_size":   int(event.get("page_size")  or 1000),
        "concurrency": int(event.get("concurrency") or 4),
        "queue_max":   int(os.environ.get("QUEUE_MAX", "2000")),
    }

Improvement 1 — event-driven config. The original hard-coded bucket, prefix, and expiry as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve 2026/04/, 2026/05/, or an entirely different bucket by changing only the event payload — no redeployment.

endpoint is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request.

queue_max bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers.


Block 3 — Structured logging (_log) and EMF metrics (_emit_emf)

def _log(event_type: str, **fields):
    print(json.dumps({"event": event_type, **fields}))

def _emit_emf(metrics: dict, **dims):
    print(json.dumps({
        "_aws": {
            "Timestamp": int(time.time() * 1000),
            "CloudWatchMetrics": [{
                "Namespace": "eth/sign_pdfs_optimized",
                "Dimensions": [list(dims.keys())],
                "Metrics": [
                    {"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
                    for k in metrics
                ],
            }],
        },
        **dims, **metrics,
    }))

Improvement 2 — structured JSON logging. The original printed nothing. _log emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly:

filter event = "complete" | stats avg(duration_ms) by bin(5m)

The local runner's _extract_json_logs picks up these lines and surfaces them in the tester's "Structured logs" panel.

Improvement 3 — EMF metrics. _emit_emf writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's _extract_emf_metrics) parses this and publishes real CloudWatch metrics — no PutMetricData API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in Bytes"Bytes", everything else → "Count".

EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation.


Block 4 — Producer with try/finally

async def producer():
    nonlocal pages
    try:
        paginator = s3.get_paginator("list_objects_v2")
        async for page in paginator.paginate(
            Bucket=cfg["bucket"], Prefix=cfg["prefix"],
            PaginationConfig={"PageSize": cfg["page_size"]},
        ):
            pages += 1
            for obj in page.get("Contents", []) or []:
                key = obj["Key"]
                if key.lower().endswith(".pdf"):
                    await queue.put(key)
    finally:
        for _ in range(cfg["concurrency"]):
            await queue.put(_DONE)

Improvement 4 — guaranteed consumer exit. The finally block runs whether the producer succeeds, raises a ClientError, or is cancelled. Each of the N consumers gets its own _DONE sentinel. Without finally, an exception in the producer leaves all consumers blocked on queue.get() forever — the handler hangs and eventually times out at 15 minutes.

Improvement 7 — PageSize=1000. S3's ListObjectsV2 returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes.


Block 5 — Consumer with per-item error handling

async def consumer():
    nonlocal errors
    local = 0
    while True:
        item = await queue.get()
        if item is _DONE:
            return local
        try:
            url = await s3.generate_presigned_url(
                "get_object",
                Params={"Bucket": cfg["bucket"], "Key": item},
                ExpiresIn=cfg["expiry"],
            )
            line = json.dumps({"key": item, "url": url}) + "\n"
            async with write_lock:
                await f.write(line)
            local += 1
        except Exception as exc:
            errors += 1
            _log("presign_error", request_id=request_id, key=item, error=str(exc))

Improvement 5 — per-item error isolation. The original let a single generate_presigned_url failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own try/except. A bad key increments errors and logs the details; the consumer continues with the next key. The caller receives {"count": N, "errors": M} and can decide whether to retry the M failures.

local counts successful presigns per consumer. The N return values are summed after asyncio.gather to get the total count.

The write_lock serialises writes to the shared JSONL file — without it, concurrent await f.write(line) calls would interleave partial lines.


Block 6 — N concurrent consumers (asyncio.gather)

prod_task = asyncio.create_task(producer())
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
await prod_task

Improvement 8 — concurrent consumers. The original ran one consumer. Here asyncio.gather starts N consumer coroutines simultaneously, all reading from the same queue. Since generate_presigned_url is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with concurrency up to the point where the producer can't keep the queue full.

asyncio.create_task(producer()) schedules the producer to run concurrently with the consumers rather than sequentially before them. The await prod_task after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task).


Block 7 — Streaming manifest upload

manifest_bytes = os.path.getsize(manifest_path)
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"

with open(manifest_path, "rb") as f:
    await s3.put_object(
        Bucket=cfg["bucket"], Key=manifest_key, Body=f,
        ContentType="application/x-ndjson",
    )

Improvement 6 — streaming upload. The original read the entire manifest into memory with await f.read() before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full read() doubles peak RAM usage. Passing a sync file handle as Body lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size.

os.path.getsize is called before opening the file for upload so the size is available for the EMF block even after the file is deleted.


Block 8 — Cleanup, metrics, and response

os.unlink(manifest_path)

result = {
    "count": count,
    "errors": errors,
    "manifest_key": manifest_key,
    "manifest_url": manifest_url,
}
response_bytes = len(json.dumps(result))
duration_ms = (time.monotonic() - t0) * 1000

_log("complete", request_id=request_id, count=count, errors=errors,
     pages=pages, duration_ms=round(duration_ms, 2))
_emit_emf({
    "PDFsProcessed": count, "S3ListPages": pages,
    "PresignCount": count, "ManifestBytes": manifest_bytes,
    "ResponseBytes": response_bytes,
}, Function="sign_pdfs_optimized")

return result

os.unlink removes the temp file after upload. /tmp is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) /tmp cap.

response_bytes = len(json.dumps(result)) captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so response_bytes is typically under 300 bytes — but measuring it makes the headroom concrete.

The final _log("complete", ...) and _emit_emf(...) emit together at the end so all counts are final. In the tester, the Structured logs panel shows the complete event and the EMF panel shows the five metric values.


Block 9 — Entrypoint

def handler(event, context):
    request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
    result = asyncio.run(_run(event, request_id))
    return {"statusCode": 200, "body": json.dumps(result)}

asyncio.run creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; asyncio.run is the correct bridge between the sync entrypoint and the async internals. A module-level loop = asyncio.get_event_loop() would work on the first invocation but is deprecated and unreliable across warm invocations.

getattr(context, "aws_request_id", ...) falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub.


Summary of improvements

# What changed Why it matters
1 Config from event, env as fallback One function serves many prefixes
2 Structured JSON logging (_log) Queryable in Logs Insights; visible in tester
3 EMF metrics (_emit_emf) Free CloudWatch metrics, no API call overhead
4 Producer try/finally with N sentinels Consumers always exit; handler never hangs
5 Per-item try/except in consumer One bad key doesn't crash the batch
6 Sync file handle as Body Flat memory for any manifest size
7 PageSize=1000 Fewer S3 round-trips on large prefixes
8 N concurrent consumers via asyncio.gather Presign throughput scales with concurrency