Files
lambda_local_runner/docs/lambdas-md/lambda-19-repository.md
2026-05-11 20:27:17 -03:00

17 KiB
Raw Permalink Blame History

Repository

Tree of eth/ — the sandbox plus this study site.

eth/
├── lambda_function.py   — handler: async PDF scan → presigned URLs → JSONL manifest
├── invoke.py             — local runner: calls handler() with a minimal event, prints result
├── seed.py               — uploads PDFs from a local directory to MinIO
├── requirements.txt      — aioboto3, aiofiles (+ transitive: aiobotocore, botocore…)
├── docker-compose.yml    — runs MinIO on :9000 (S3 API) and :9001 (web console)
├── Makefile              — install / up / down / seed / invoke / graphs / docs
├── def/
│   └── task.md               — original interview exercise specification
└── docs/
    ├── index.html             — this study site (single-page, no build step)
    ├── viewer.html            — pan/zoom SVG viewer (opened by graph links)
    └── graphs/
        ├── system_overview.dot / .svg   — caller → handler → MinIO/S3 → manifest
        ├── lifecycle.dot / .svg         — init / handler / freeze / thaw / shutdown
        └── cold_warm_timeline.dot / .svg — cold vs warm invocation timeline

Walking through lambda_function.py

What the function does, end to end

In one paragraph: the function lists every PDF inside an S3 prefix. For each one, it generates a presigned download URL that expires in 15 minutes. It writes those (key, URL) pairs into a JSONL file in /tmp as it goes. When the listing is done, it uploads the JSONL to S3 as a manifest, generates one more presigned URL pointing to the manifest itself, deletes the local file, and returns the manifest URL plus the count.

The use case: you want to ship a batch of files to someone who isn't on your AWS account. Send them one URL. They open it, get back a list of links, every link works for 15 minutes, then everything dies.

Imports and module-scope config

import asyncio
import json
import os
import uuid

import aioboto3
import aiofiles

aioboto3 is the async version of boto3 — async S3 calls, so we can overlap I/O. aiofiles is async filesystem access — same reason.

BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket")
PREFIX = os.environ.get("PREFIX", "2026/04/")
EXPIRY = int(os.environ.get("URL_EXPIRY_SECONDS", "900"))
ENDPOINT = os.environ.get("S3_ENDPOINT_URL") or None
QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "2000"))

_DONE = object()

Five environment reads at module scope — init phase. They run once per cold start, get cached as Python module attributes, and every warm invocation reuses them for free.

ENDPOINT is the trick that lets this run against MinIO locally. When you run on real Lambda, you don't set the env var, the value is None, and aioboto3 talks to real S3. When you run locally, you set it to http://localhost:9000 and the same code talks to MinIO. The function doesn't know the difference.

_DONE is a sentinel — a unique singleton put on the queue to signal "no more items coming." The reason it's an object() and not a string: a string could theoretically collide with a real S3 key. An object() instance has a unique identity; comparing with is — not == — is unambiguous.

The handler — minimal on purpose

def handler(event, context):
    result = asyncio.run(_run())
    return {"statusCode": 200, "body": json.dumps(result)}

The handler is sync because Lambda's contract is sync. AWS calls handler(event, context) and waits for it to return. Inside, asyncio.run opens a fresh event loop, runs the async coroutine, gets back a result. The API-Gateway-style response shape (statusCode + body) is a habit — useful when the function gets fronted by API Gateway later; a pure Lambda invoke doesn't need it but it doesn't hurt.

asyncio.run creates a fresh event loop per invocation. This means async clients can't be shared across invocations the way sync boto3 clients can. The cost is small — tens of microseconds — but it's the reason the S3 client is created inside _run, not at module scope.

Why async at all? Lambda bills per millisecond of wall-clock time. Anything you can overlap, you save money on. The function does a lot of S3 calls — listing pages, generating presigned URLs, writing files. While S3 is preparing the next page of results, the consumer is already presigning and writing the previous page. That overlap directly reduces duration and cost.

_run() — the actual work

async def _run():
    session = aioboto3.Session()
    async with session.client("s3", endpoint_url=ENDPOINT) as s3:
        queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX)
        manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"

The session is created inside _run, not at module scope, because aioboto3 async clients are tied to the event loop — and each invocation gets a fresh event loop via asyncio.run. Sync boto3 clients you'd put at module scope; async ones you create per invocation.

The queue has a maximum size of 2000 by default. Without the bound, if the producer is faster than the consumer, the queue grows in memory. Lambda has at most 10 GB of memory, usually 256512 MB. Scanning a bucket with a million PDFs and loading them all before presigning even one would OOM. The bounded queue gives backpressure: when full, await queue.put(...) blocks until the consumer takes something off. Memory stays flat.

The manifest path uses a UUID so that back-to-back warm invocations on the same environment don't collide on /tmp. (/tmp persists across warm invocations; a fixed filename would be a race condition.)

The producer

        async def producer():
            paginator = s3.get_paginator("list_objects_v2")
            async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX):
                for obj in page.get("Contents", []) or []:
                    key = obj["Key"]
                    if key.lower().endswith(".pdf"):
                        await queue.put(key)
            await queue.put(_DONE)

Defined inside _run as a closure — captures s3 and queue from the enclosing scope without arguments. Also signals it's a private implementation detail.

S3 returns at most 1000 objects per page. The paginator hides the pagination — async for page in paginator.paginate(...) transparently fetches the next page when needed. For each object, filter by .pdf (case-insensitive) and put the key on the queue.

When the paginator is exhausted, put _DONE on the queue. That tells the consumer to stop. asyncio.Queue has no close method — the sentinel is the standard pattern.

await queue.put(key) blocks if the queue is full. That's the backpressure: producer pauses until consumer takes something off.

The consumer

        async def consumer():
            count = 0
            async with aiofiles.open(manifest_path, "w") as f:
                while True:
                    item = await queue.get()
                    if item is _DONE:
                        break
                    url = await s3.generate_presigned_url(
                        "get_object",
                        Params={"Bucket": BUCKET, "Key": item},
                        ExpiresIn=EXPIRY,
                    )
                    await f.write(json.dumps({"key": item, "url": url}) + "\n")
                    count += 1
            return count

Same closure pattern. Opens the manifest file async. Loops forever, pulling from the queue. On sentinel, breaks. Otherwise generates a presigned URL and writes a JSONL line.

generate_presigned_url is a local computation, not a network call. It uses your credentials, bucket, key, expiry, and region to produce a signed URL deterministically. Fast — no HTTP request.

Why JSONL instead of a JSON array? Because JSONL streams. You write one line at a time without buffering the whole array in memory. The reader can process one line at a time. If the manifest grows to gigabytes, JSONL stays usable.

Running them together

        prod_task = asyncio.create_task(producer())
        count = await consumer()
        await prod_task

create_task schedules the producer on the event loop and returns immediately — producer runs in the background. await consumer() runs the consumer in the foreground until it sees the sentinel and returns the count. await prod_task ensures the producer has fully completed and propagates any exceptions.

This is the overlap: while S3 is preparing the next LIST page (network round trip), the consumer is presigning and writing the previous page. Sequential would be: list everything, then presign everything. With overlap you pay only the larger of the two latencies. For thousands of files, this cuts wall-clock time and cost noticeably.

Uploading the manifest

        manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
        async with aiofiles.open(manifest_path, "rb") as f:
            body = await f.read()
        await s3.put_object(
            Bucket=BUCKET,
            Key=manifest_key,
            Body=body,
            ContentType="application/x-ndjson",
        )

Read the /tmp file as bytes and upload with put_object. Content type application/x-ndjson is the registered MIME type for newline-delimited JSON. put_object rather than upload_file because aioboto3's async multipart logic is simpler this way for files in the hundreds-of-KB to few-MB range.

Generating the manifest URL and cleaning up

        manifest_url = await s3.generate_presigned_url(
            "get_object",
            Params={"Bucket": BUCKET, "Key": manifest_key},
            ExpiresIn=EXPIRY,
        )

        os.unlink(manifest_path)

        return {
            "count": count,
            "manifest_key": manifest_key,
            "manifest_url": manifest_url,
        }

Presign the manifest itself. Delete the /tmp file — /tmp persists across warm invocations; without cleanup, a thousand invocations on the same environment would fill it. Return count, S3 key, and the URL. The handler wraps that in {"statusCode": 200, "body": ...} and returns.

Why this design?

Why presigned URLs, not return the data directly? The response is small (one URL), the recipient doesn't need an AWS account to use it, and it expires automatically. The URL is signed by your credentials and works for anyone who has it for 15 minutes.

Why upload the manifest to S3 and return a URL to it, instead of returning the manifest contents inline? The 6 MB sync response cap. Ten thousand presigned URLs in JSONL is 35 MB. Twenty thousand blows the cap — silently: the function succeeds, the caller gets a 413. The manifest-in-S3 pattern has no upper bound.

Why async? Overlap S3 LIST calls with presigning and file writes. Even though presigning is local, the LIST round trips and final upload benefit from non-blocking I/O.

Why producer and consumer instead of one loop? The producer is bursty (up to 1000 keys per page dump). The consumer is steady. Decoupling with a queue means the producer races ahead while the consumer drains, instead of LIST → presign → LIST → presign serially.

Why a bounded queue? Backpressure. Without the bound, the producer can outrun the consumer and exhaust memory. With the bound, await queue.put(...) blocks when full. Memory stays flat regardless of bucket size.

Why a sentinel and not closing the queue? asyncio.Queue has no close method. The sentinel is the standard "done" signal.

Why nested functions? Closures over s3, queue, manifest_path. No arguments to pass. Private implementation details of _run.

Why UUID in the /tmp filename? /tmp persists across warm invocations. A fixed filename collides between back-to-back runs. UUID guarantees uniqueness.

Why _DONE = object() instead of a string sentinel? An object() instance has a unique identity that can't possibly collide with any real S3 key. is comparison (identity, not equality) is unambiguous.

Why os.unlink at the end? /tmp is per-environment, at most 10 GB, and persists. A thousand warm invocations without cleanup would fill it and crash subsequent runs.

Cold start vs warm — what you'd see in CloudWatch

First invocation (cold):

REPORT RequestId: ...  Duration: 312.45 ms  Billed Duration: 313 ms
       Memory Size: 256 MB  Max Memory Used: 89 MB
       Init Duration: 423.12 ms

Init Duration ~400 ms covers importing aioboto3 and aiofiles (aioboto3 pulls in aiobotocore which pulls in botocore — heavy). Duration ~300 ms is the actual scan: list, presign, write, upload.

Second invocation within 30 seconds (warm):

REPORT RequestId: ...  Duration: 287.91 ms  Billed Duration: 288 ms
       Memory Size: 256 MB  Max Memory Used: 91 MB

No Init Duration line. Jumped straight to the handler. ~30 ms saved. For a function that runs once a day, every invocation is cold and init matters. For one that runs every few seconds, almost everything is warm.

What happens if it times out

The default function timeout is 3 s — almost certainly not enough. Set it explicitly to 3060 s for a small prefix, up to 900 s (15 min) for a large one. If it times out, Lambda kills the process. The /tmp file may not have been deleted. The manifest may or may not have been uploaded. Re-running produces a fresh manifest with new UUIDs — the previous partial manifest stays in S3 until TTL or manual cleanup.

How would you scale this

Fan out by prefix. Wrap in a Step Functions Map state. Pass a list of prefixes; each map iteration runs one Lambda for one prefix. MaxConcurrency controls parallelism without saturating the account concurrency quota.

Go event-driven. Subscribe to S3 ObjectCreated events filtered to *.pdf. The function fires once per upload, handles one file at a time. No producer/consumer needed — nothing to enumerate. Simpler, but different semantics: "process new files as they arrive" vs "scan the existing bucket."

What I'd change before production

  1. Move BUCKET and PREFIX to the event payload — currently set at deploy time, which means one function per prefix. Event-driven config lets one function serve many prefixes.
  2. Structured logging — JSON to stdout with request_id, bucket, prefix, count. Logs Insights can aggregate without regex.
  3. EMF metric for count — free CloudWatch metric, no additional API call. Dashboard "PDFs processed per invocation" over time.
  4. Producer error handling — if paginator.paginate raises, the producer task fails but the consumer keeps blocking on queue.get() forever, and the function times out. Wrap the producer body in try/finally that always puts _DONE on the queue so the consumer exits cleanly.
  5. Explicit timeout on queue.get()await asyncio.wait_for(queue.get(), timeout=X) prevents the consumer hanging indefinitely if the producer dies without putting the sentinel.
  6. Consider sync boto3aioboto3 adds ~200 ms to the cold start. If cold start matters and file counts are small, sync boto3 with threading is simpler and starts faster. Async pays off when file counts are large enough that overlap is significant.

Design Q&A

Architectural questions worth keeping so they don't have to be re-derived.

Why not split producer and consumer into separate Lambda functions?

generate_presigned_url is local computation — no network call, just CPU. The consumer isn't blocked on anything external. The async coroutine pattern already provides the overlap benefit (S3 LIST wait ↔ presign + write) without any infrastructure overhead.

Splitting into two Lambdas would mean: in-process asyncio.Queue → SQS (latency + cost per message), two cold starts, two IAM roles, coordination logic — and the actual bottleneck (S3 LIST pagination) would be unchanged.

Split producer/consumer into separate Lambdas when: the consumer does real per-item I/O (external API calls, content downloads), per-item processing takes seconds not milliseconds, or you need independent retry semantics. None of those apply here.

Scale-out for this function belongs one level up: Step Functions Map state across prefixes (one Lambda per prefix), not within-prefix producer/consumer separation.

Makefile targets

Target What it does
make install Creates .venv, installs requirements.txt
make up Starts MinIO via docker compose up -d
make down Stops MinIO (keeps volumes)
make clean Stops MinIO and deletes volumes (wipes bucket data)
SOURCE_DIR=path make seed Uploads all files from path to MinIO
make invoke Runs invoke.py (calls handler() directly)
make graphs Renders all docs/graphs/*.dot.svg via Graphviz dot
make docs Renders graphs then opens docs/index.html