diff --git a/docs/index.html b/docs/index.html index 6cf9814..396260c 100644 --- a/docs/index.html +++ b/docs/index.html @@ -544,18 +544,24 @@
- EMF metrics (CloudWatch embedded-metric-format on stdout — currently empty, will populate after improvement #3) + EMF metrics (CloudWatch embedded-metric-format on stdout — populates when the function emits, e.g. sign_pdfs_optimized)
No EMF metrics detected.
+
+ Packaging (deployment sizes from the live pod — function zips, shared layer, largest deps, vs AWS caps) +
Loading…
+
+

History

Click a row to view its full record. Cleared on page reload (FastAPI keeps the last 200 invocations server-side regardless).

+
- + @@ -667,6 +673,12 @@ function renderReport(rec) { const initLine = m.init_duration_ms != null ? `Init Duration:${m.init_duration_ms.toFixed(2)} ms\n` : ''; + // GB-s and projected cost — AWS Lambda bills (memory_GB × duration_s). + // Prices: $0.0000166667 per GB-s on x86, $0.0000133334 on arm64. +$0.20/1M requests. + const gbS = (m.memory_size_mb / 1024) * (m.duration_ms / 1000); + const costX86 = gbS * 0.0000166667; + const costArm = gbS * 0.0000133334; + const per1M = (cost) => (cost * 1_000_000 + 0.20).toFixed(2); $('tester-report').innerHTML = ( `${tag} REPORT RequestId:${rec.invocation_id}\n` + `Function:${rec.function}\n` + @@ -674,7 +686,12 @@ function renderReport(rec) { `Billed Duration:${m.billed_duration_ms} ms\n` + `Memory Size:${m.memory_size_mb} MB ` + `Max Memory Used:${m.max_memory_used_mb.toFixed(2)} MB\n` + - initLine + initLine + + `GB-seconds:${gbS.toFixed(6)} GB-s\n` + + `Cost (x86):$${costX86.toFixed(9)} ` + + `×1M:$${per1M(costX86)}\n` + + `Cost (arm64):$${costArm.toFixed(9)} ` + + `×1M:$${per1M(costArm)}` ); } @@ -727,8 +744,28 @@ async function loadHistory() { } catch (e) { /* ignore on first load if backend is still booting */ } } +function _pctile(arr, p) { + if (!arr.length) return null; + const s = arr.slice().sort((a, b) => a - b); + const i = Math.min(s.length - 1, Math.floor((p / 100) * s.length)); + return s[i]; +} + +function _renderHistorySummary() { + const el = $('tester-history-summary'); + if (!el) return; + if (!_history.length) { el.textContent = ''; return; } + const cold = _history.filter(h => h.cold_start && h.init_duration_ms != null).map(h => h.init_duration_ms); + const warm = _history.filter(h => !h.cold_start).map(h => h.duration_ms); + const parts = [`${_history.length} invocations`]; + if (cold.length) parts.push(`cold init p50${_pctile(cold, 50).toFixed(0)} ms p99 ${_pctile(cold, 99).toFixed(0)} ms`); + if (warm.length) parts.push(`warm p50${_pctile(warm, 50).toFixed(0)} ms p99 ${_pctile(warm, 99).toFixed(0)} ms`); + el.innerHTML = parts.join(' | '); +} + function renderHistory() { const tbody = $('tester-history-body'); + _renderHistorySummary(); if (!_history.length) { tbody.innerHTML = ''; return; @@ -751,7 +788,7 @@ function renderHistory() { - + `; tr.addEventListener('click', () => loadHistoryDetail(h.invocation_id, tr)); tbody.appendChild(tr); @@ -773,10 +810,20 @@ async function loadHistoryDetail(id, row) { } } +function _clearTesterPanels() { + // Wipe per-invocation panels (everything except the History table) so stale + // data from a previous run never leaks into the new one if it errors mid-flight. + $('tester-result').innerHTML = '…running…'; + $('tester-stdout').innerHTML = '…running…'; + $('tester-logs').innerHTML = '…running…'; + $('tester-emf').innerHTML = '…running…'; +} + async function invoke() { const btn = $('tester-invoke'); btn.disabled = true; $('tester-report').innerHTML = '…running…'; + _clearTesterPanels(); try { const fn = $('tester-function').value; if (!fn) throw new Error('No function selected.'); @@ -819,6 +866,58 @@ async function resetCold() { } } +function _fmtBytes(n) { + if (n < 1024) return n + ' B'; + if (n < 1024 * 1024) return (n / 1024).toFixed(1) + ' KB'; + if (n < 1024 * 1024 * 1024) return (n / (1024 * 1024)).toFixed(2) + ' MB'; + return (n / (1024 * 1024 * 1024)).toFixed(2) + ' GB'; +} + +function _bar(used, cap) { + const pct = Math.min(100, (used / cap) * 100); + const colour = pct > 80 ? '#ff3d00' : pct > 50 ? '#ffc107' : '#00c853'; + return `${pct.toFixed(2)}%`; +} + +async function loadPackaging() { + const pre = $('tester-packaging'); + try { + const p = await api('/packaging'); + const L = p.limits; + let out = 'Functions (one deployment zip per folder):\n'; + for (const f of p.functions) { + out += ` ${f.name.padEnd(24)} ` + + `zip:${_fmtBytes(f.folder_zip_bytes).padStart(10)} ` + + `${_bar(f.folder_zip_bytes, L.zip_upload_max)} of 50 MB upload cap ` + + `unzipped:${_fmtBytes(f.folder_bytes)}\n`; + } + out += '\nShared layer (would be a Lambda Layer in AWS):\n'; + out += ` shared/ ` + + `zip:${_fmtBytes(p.shared_layer.zip_bytes).padStart(10)} ` + + `unzipped:${_fmtBytes(p.shared_layer.bytes)}\n`; + const totalUnz = p.dependencies_total_bytes + + p.shared_layer.bytes + + p.functions.reduce((s, f) => s + f.folder_bytes, 0); + out += '\nLargest installed dependencies (top 25, ≥50 KB):\n'; + for (const d of p.dependencies) { + out += ` ${d.name.padEnd(24)} ${_fmtBytes(d.bytes).padStart(10)}\n`; + } + out += `\nTotal deps:${_fmtBytes(p.dependencies_total_bytes)}\n`; + out += `Total unzipped (function + shared + deps):${_fmtBytes(totalUnz)} ` + + `${_bar(totalUnz, L.unzipped_max)} of 250 MB unzipped cap\n`; + out += `\nAWS caps for reference:\n`; + out += ` zip upload ${_fmtBytes(L.zip_upload_max)} ` + + `unzipped ${_fmtBytes(L.unzipped_max)} ` + + `container image ${_fmtBytes(L.container_image_max)}\n`; + out += ` /tmp default ${_fmtBytes(L.tmp_default)} ` + + `/tmp max ${_fmtBytes(L.tmp_max)} ` + + `sync response ${_fmtBytes(L.response_max)}\n`; + pre.innerHTML = out; + } catch (e) { + pre.innerHTML = `[ERROR] ${e.message}`; + } +} + async function loadScripts() { const fn = $('tester-function').value; const sel = $('script-name'); @@ -883,6 +982,7 @@ document.addEventListener('DOMContentLoaded', () => { $('script-run').addEventListener('click', runScript); loadFunctions(); loadHistory(); + loadPackaging(); }); diff --git a/docs/lambdas-md/lambda-20-sign-pdfs-optimized.md b/docs/lambdas-md/lambda-20-sign-pdfs-optimized.md new file mode 100644 index 0000000..bd78712 --- /dev/null +++ b/docs/lambdas-md/lambda-20-sign-pdfs-optimized.md @@ -0,0 +1,237 @@ +# sign_pdfs_optimized — Walkthrough + +> Production-refined fork of `sign_pdfs`. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester. + +The original `sign_pdfs` is the "before" snapshot — it is left untouched. This file (`sign_pdfs_optimized/handler.py`) is the "after": same contract, same output shape, all the rough edges from the original fixed. + +--- + +## Block 1 — Imports and sentinel + +```python +import asyncio, json, os, time, uuid +import aioboto3 +import aiofiles + +_DONE = object() +``` + +Nothing new at the import level. `_DONE` is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than `None` or a string) means no PDF key will ever accidentally match it. + +**Original issue fixed here:** the original put a single `_DONE` on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's `finally` block (Block 4). + +--- + +## Block 2 — Config from event (`_cfg`) + +```python +def _cfg(event: dict) -> dict: + return { + "bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"), + "prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"), + "expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")), + "endpoint": os.environ.get("S3_ENDPOINT_URL") or None, + "page_size": int(event.get("page_size") or 1000), + "concurrency": int(event.get("concurrency") or 4), + "queue_max": int(os.environ.get("QUEUE_MAX", "2000")), + } +``` + +**Improvement 1 — event-driven config.** The original hard-coded `bucket`, `prefix`, and `expiry` as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve `2026/04/`, `2026/05/`, or an entirely different bucket by changing only the event payload — no redeployment. + +`endpoint` is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request. + +`queue_max` bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers. + +--- + +## Block 3 — Structured logging (`_log`) and EMF metrics (`_emit_emf`) + +```python +def _log(event_type: str, **fields): + print(json.dumps({"event": event_type, **fields})) + +def _emit_emf(metrics: dict, **dims): + print(json.dumps({ + "_aws": { + "Timestamp": int(time.time() * 1000), + "CloudWatchMetrics": [{ + "Namespace": "eth/sign_pdfs_optimized", + "Dimensions": [list(dims.keys())], + "Metrics": [ + {"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"} + for k in metrics + ], + }], + }, + **dims, **metrics, + })) +``` + +**Improvement 2 — structured JSON logging.** The original printed nothing. `_log` emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly: + +``` +filter event = "complete" | stats avg(duration_ms) by bin(5m) +``` + +The local runner's `_extract_json_logs` picks up these lines and surfaces them in the tester's "Structured logs" panel. + +**Improvement 3 — EMF metrics.** `_emit_emf` writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's `_extract_emf_metrics`) parses this and publishes real CloudWatch metrics — no `PutMetricData` API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in `Bytes` → `"Bytes"`, everything else → `"Count"`. + +EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation. + +--- + +## Block 4 — Producer with `try/finally` + +```python +async def producer(): + nonlocal pages + try: + paginator = s3.get_paginator("list_objects_v2") + async for page in paginator.paginate( + Bucket=cfg["bucket"], Prefix=cfg["prefix"], + PaginationConfig={"PageSize": cfg["page_size"]}, + ): + pages += 1 + for obj in page.get("Contents", []) or []: + key = obj["Key"] + if key.lower().endswith(".pdf"): + await queue.put(key) + finally: + for _ in range(cfg["concurrency"]): + await queue.put(_DONE) +``` + +**Improvement 4 — guaranteed consumer exit.** The `finally` block runs whether the producer succeeds, raises a `ClientError`, or is cancelled. Each of the N consumers gets its own `_DONE` sentinel. Without `finally`, an exception in the producer leaves all consumers blocked on `queue.get()` forever — the handler hangs and eventually times out at 15 minutes. + +**Improvement 7 — `PageSize=1000`.** S3's `ListObjectsV2` returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes. + +--- + +## Block 5 — Consumer with per-item error handling + +```python +async def consumer(): + nonlocal errors + local = 0 + while True: + item = await queue.get() + if item is _DONE: + return local + try: + url = await s3.generate_presigned_url( + "get_object", + Params={"Bucket": cfg["bucket"], "Key": item}, + ExpiresIn=cfg["expiry"], + ) + line = json.dumps({"key": item, "url": url}) + "\n" + async with write_lock: + await f.write(line) + local += 1 + except Exception as exc: + errors += 1 + _log("presign_error", request_id=request_id, key=item, error=str(exc)) +``` + +**Improvement 5 — per-item error isolation.** The original let a single `generate_presigned_url` failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own `try/except`. A bad key increments `errors` and logs the details; the consumer continues with the next key. The caller receives `{"count": N, "errors": M}` and can decide whether to retry the M failures. + +`local` counts successful presigns per consumer. The N return values are summed after `asyncio.gather` to get the total count. + +The `write_lock` serialises writes to the shared JSONL file — without it, concurrent `await f.write(line)` calls would interleave partial lines. + +--- + +## Block 6 — N concurrent consumers (`asyncio.gather`) + +```python +prod_task = asyncio.create_task(producer()) +counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"]))) +await prod_task +``` + +**Improvement 8 — concurrent consumers.** The original ran one consumer. Here `asyncio.gather` starts N consumer coroutines simultaneously, all reading from the same queue. Since `generate_presigned_url` is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with `concurrency` up to the point where the producer can't keep the queue full. + +`asyncio.create_task(producer())` schedules the producer to run concurrently with the consumers rather than sequentially before them. The `await prod_task` after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task). + +--- + +## Block 7 — Streaming manifest upload + +```python +manifest_bytes = os.path.getsize(manifest_path) +manifest_key = f"manifests/{uuid.uuid4()}.jsonl" + +with open(manifest_path, "rb") as f: + await s3.put_object( + Bucket=cfg["bucket"], Key=manifest_key, Body=f, + ContentType="application/x-ndjson", + ) +``` + +**Improvement 6 — streaming upload.** The original read the entire manifest into memory with `await f.read()` before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full `read()` doubles peak RAM usage. Passing a sync file handle as `Body` lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size. + +`os.path.getsize` is called before opening the file for upload so the size is available for the EMF block even after the file is deleted. + +--- + +## Block 8 — Cleanup, metrics, and response + +```python +os.unlink(manifest_path) + +result = { + "count": count, + "errors": errors, + "manifest_key": manifest_key, + "manifest_url": manifest_url, +} +response_bytes = len(json.dumps(result)) +duration_ms = (time.monotonic() - t0) * 1000 + +_log("complete", request_id=request_id, count=count, errors=errors, + pages=pages, duration_ms=round(duration_ms, 2)) +_emit_emf({ + "PDFsProcessed": count, "S3ListPages": pages, + "PresignCount": count, "ManifestBytes": manifest_bytes, + "ResponseBytes": response_bytes, +}, Function="sign_pdfs_optimized") + +return result +``` + +`os.unlink` removes the temp file after upload. `/tmp` is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) `/tmp` cap. + +`response_bytes = len(json.dumps(result))` captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so `response_bytes` is typically under 300 bytes — but measuring it makes the headroom concrete. + +The final `_log("complete", ...)` and `_emit_emf(...)` emit together at the end so all counts are final. In the tester, the Structured logs panel shows the `complete` event and the EMF panel shows the five metric values. + +--- + +## Block 9 — Entrypoint + +```python +def handler(event, context): + request_id = getattr(context, "aws_request_id", str(uuid.uuid4())) + result = asyncio.run(_run(event, request_id)) + return {"statusCode": 200, "body": json.dumps(result)} +``` + +`asyncio.run` creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; `asyncio.run` is the correct bridge between the sync entrypoint and the async internals. A module-level `loop = asyncio.get_event_loop()` would work on the first invocation but is deprecated and unreliable across warm invocations. + +`getattr(context, "aws_request_id", ...)` falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub. + +--- + +## Summary of improvements + +| # | What changed | Why it matters | +|---|---|---| +| 1 | Config from event, env as fallback | One function serves many prefixes | +| 2 | Structured JSON logging (`_log`) | Queryable in Logs Insights; visible in tester | +| 3 | EMF metrics (`_emit_emf`) | Free CloudWatch metrics, no API call overhead | +| 4 | Producer `try/finally` with N sentinels | Consumers always exit; handler never hangs | +| 5 | Per-item `try/except` in consumer | One bad key doesn't crash the batch | +| 6 | Sync file handle as `Body` | Flat memory for any manifest size | +| 7 | `PageSize=1000` | Fewer S3 round-trips on large prefixes | +| 8 | N concurrent consumers via `asyncio.gather` | Presign throughput scales with `concurrency` | diff --git a/functions/sign_pdfs_optimized/events/default.json b/functions/sign_pdfs_optimized/events/default.json new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/functions/sign_pdfs_optimized/events/default.json @@ -0,0 +1 @@ +{} diff --git a/functions/sign_pdfs_optimized/events/tuned.json b/functions/sign_pdfs_optimized/events/tuned.json new file mode 100644 index 0000000..44566ed --- /dev/null +++ b/functions/sign_pdfs_optimized/events/tuned.json @@ -0,0 +1,6 @@ +{ + "bucket": "my-company-reports-bucket", + "prefix": "2026/04/", + "page_size": 1000, + "concurrency": 4 +} diff --git a/functions/sign_pdfs_optimized/handler.py b/functions/sign_pdfs_optimized/handler.py new file mode 100644 index 0000000..54846d2 --- /dev/null +++ b/functions/sign_pdfs_optimized/handler.py @@ -0,0 +1,165 @@ +"""sign_pdfs_optimized — production-refined fork of sign_pdfs. + +Same contract as the original (def handler(event, context) → {"statusCode": 200, "body": ...}) +but with 8 production refinements applied side-by-side for the demo: + + 1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency), + env vars are fallback only. Same function can serve many prefixes. + 2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms. + 3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount, + ManifestBytes, ResponseBytes. + 4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel + per consumer, not a single _DONE). + 5. Consumer per-item try/except — failed presigns count into errors, batch survives. + 6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM. + 7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes. + 8. N concurrent consumers via asyncio.gather — presign throughput scales with + the concurrency knob in the event. +""" +import asyncio +import json +import os +import time +import uuid + +import aioboto3 +import aiofiles + +_DONE = object() + + +def _cfg(event: dict) -> dict: + return { + "bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"), + "prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"), + "expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")), + "endpoint": os.environ.get("S3_ENDPOINT_URL") or None, + "page_size": int(event.get("page_size") or 1000), + "concurrency": int(event.get("concurrency") or 4), + "queue_max": int(os.environ.get("QUEUE_MAX", "2000")), + } + + +def _log(event_type: str, **fields): + print(json.dumps({"event": event_type, **fields})) + + +def _emit_emf(metrics: dict, **dims): + print(json.dumps({ + "_aws": { + "Timestamp": int(time.time() * 1000), + "CloudWatchMetrics": [{ + "Namespace": "eth/sign_pdfs_optimized", + "Dimensions": [list(dims.keys())], + "Metrics": [ + {"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"} + for k in metrics + ], + }], + }, + **dims, **metrics, + })) + + +async def _run(event: dict, request_id: str): + cfg = _cfg(event) + _log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"], + page_size=cfg["page_size"], concurrency=cfg["concurrency"]) + t0 = time.monotonic() + + pages = 0 + errors = 0 + session = aioboto3.Session() + async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3: + queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"]) + manifest_path = f"/tmp/{uuid.uuid4()}.jsonl" + + async def producer(): + nonlocal pages + try: + paginator = s3.get_paginator("list_objects_v2") + async for page in paginator.paginate( + Bucket=cfg["bucket"], Prefix=cfg["prefix"], + PaginationConfig={"PageSize": cfg["page_size"]}, + ): + pages += 1 + for obj in page.get("Contents", []) or []: + key = obj["Key"] + if key.lower().endswith(".pdf"): + await queue.put(key) + finally: + # one sentinel per consumer so each gather() task exits cleanly + for _ in range(cfg["concurrency"]): + await queue.put(_DONE) + + write_lock = asyncio.Lock() + async with aiofiles.open(manifest_path, "w") as f: + + async def consumer(): + nonlocal errors + local = 0 + while True: + item = await queue.get() + if item is _DONE: + return local + try: + url = await s3.generate_presigned_url( + "get_object", + Params={"Bucket": cfg["bucket"], "Key": item}, + ExpiresIn=cfg["expiry"], + ) + line = json.dumps({"key": item, "url": url}) + "\n" + async with write_lock: + await f.write(line) + local += 1 + except Exception as exc: + errors += 1 + _log("presign_error", request_id=request_id, key=item, error=str(exc)) + + prod_task = asyncio.create_task(producer()) + counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"]))) + await prod_task + + count = sum(counts) + manifest_bytes = os.path.getsize(manifest_path) + manifest_key = f"manifests/{uuid.uuid4()}.jsonl" + + # Sync file handle as Body — aiobotocore reads in chunks instead of buffering + # the whole manifest in memory like `body = await f.read()` would. + with open(manifest_path, "rb") as f: + await s3.put_object( + Bucket=cfg["bucket"], Key=manifest_key, Body=f, + ContentType="application/x-ndjson", + ) + + manifest_url = await s3.generate_presigned_url( + "get_object", + Params={"Bucket": cfg["bucket"], "Key": manifest_key}, + ExpiresIn=cfg["expiry"], + ) + os.unlink(manifest_path) + + result = { + "count": count, + "errors": errors, + "manifest_key": manifest_key, + "manifest_url": manifest_url, + } + response_bytes = len(json.dumps(result)) + duration_ms = (time.monotonic() - t0) * 1000 + + _log("complete", request_id=request_id, count=count, errors=errors, + pages=pages, duration_ms=round(duration_ms, 2)) + _emit_emf({ + "PDFsProcessed": count, "S3ListPages": pages, + "PresignCount": count, "ManifestBytes": manifest_bytes, + "ResponseBytes": response_bytes, + }, Function="sign_pdfs_optimized") + + return result + + +def handler(event, context): + request_id = getattr(context, "aws_request_id", str(uuid.uuid4())) + result = asyncio.run(_run(event, request_id)) + return {"statusCode": 200, "body": json.dumps(result)} diff --git a/functions/sign_pdfs_optimized/requirements.txt b/functions/sign_pdfs_optimized/requirements.txt new file mode 100644 index 0000000..2e3d7fc --- /dev/null +++ b/functions/sign_pdfs_optimized/requirements.txt @@ -0,0 +1,3 @@ +# Same deps as sign_pdfs/ — this is the "after" demo fork, same dep footprint. +aioboto3>=15.0 # async S3 client +aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp diff --git a/runner.py b/runner.py index 2343519..b265676 100644 --- a/runner.py +++ b/runner.py @@ -19,9 +19,11 @@ import os import resource import subprocess import sys +import sysconfig import time import traceback import uuid +import zipfile from contextlib import redirect_stderr, redirect_stdout from pathlib import Path @@ -42,6 +44,7 @@ if SHARED_DIR.exists(): app = FastAPI(title="Lambda Local Runner") _modules: dict = {} # name -> imported module (cache; presence = warm) +_module_deps: dict = {} # name -> set of sys.modules keys added during this function's init _invocations: list[dict] = [] # newest last; capped at MAX_INVOCATIONS @@ -156,6 +159,11 @@ def invoke(name: str, req: InvokeRequest): f"functions.{name}.handler", target, ) module = importlib.util.module_from_spec(spec) + # Snapshot sys.modules so /reset can pop the transitive deps this function + # pulled in (aioboto3 → aiobotocore → botocore, etc.). Without this the + # second cold start is fake — heavy imports stay cached in the long-running + # uvicorn process, and Force Cold reports unrealistically small numbers. + sys_modules_before = set(sys.modules) t0 = time.monotonic() try: spec.loader.exec_module(module) @@ -166,6 +174,7 @@ def invoke(name: str, req: InvokeRequest): return _record(record) init_duration_ms = (time.monotonic() - t0) * 1000 _modules[name] = module + _module_deps[name] = set(sys.modules) - sys_modules_before record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2) module = _modules[name] @@ -244,13 +253,19 @@ def clear_invocations(): @app.post("/reset") def reset_modules(): - """Clear the module cache so the next invocation is cold. Useful for - A/B-ing cold-start cost without restarting the FastAPI process.""" + """Clear the module cache AND the transitive imports each function pulled in + during its init, so the next invocation pays a realistic cold-start cost + (re-importing aioboto3 → aiobotocore → botocore from disk, not a no-op + against an already-warm uvicorn process).""" cleared = list(_modules.keys()) - _modules.clear() + popped = 0 for name in cleared: - sys.modules.pop(name, None) - return {"cleared": cleared} + sys.modules.pop(f"functions.{name}.handler", None) + for dep in _module_deps.pop(name, ()): + if sys.modules.pop(dep, None) is not None: + popped += 1 + _modules.clear() + return {"cleared": cleared, "transitive_modules_popped": popped} @app.get("/functions/{name}/scripts") @@ -291,6 +306,85 @@ def run_script(fn_name: str, script_name: str, req: ScriptRequest): } +@app.get("/packaging") +def packaging(): + """Static sizing report — what each function would ship as a Lambda deployment zip, + what the shared layer would weigh, what the largest installed deps look like. + All computed against the live pod filesystem so numbers are real, not extrapolated.""" + funcs = [] + for d in sorted(FUNCTIONS_DIR.iterdir()): + if not d.is_dir() or d.name.startswith("_"): + continue + if not (d / "handler.py").exists(): + continue + funcs.append({ + "name": d.name, + "handler_bytes": (d / "handler.py").stat().st_size, + "folder_bytes": _dir_bytes(d), + "folder_zip_bytes": _zip_bytes(d), + }) + + site_packages = _site_packages_dir() + deps: list[dict] = [] + if site_packages: + for child in sorted(site_packages.iterdir()): + if not child.is_dir(): + continue + if child.name.startswith("_") or child.name.endswith(".dist-info"): + continue + b = _dir_bytes(child) + if b > 50_000: + deps.append({"name": child.name, "bytes": b}) + deps.sort(key=lambda x: -x["bytes"]) + + shared_bytes = _dir_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0 + shared_zip = _zip_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0 + + return { + "functions": funcs, + "dependencies": deps[:25], + "dependencies_total_bytes": sum(d["bytes"] for d in deps), + "shared_layer": {"bytes": shared_bytes, "zip_bytes": shared_zip}, + "limits": { + "zip_upload_max": 50 * 1024 * 1024, + "unzipped_max": 250 * 1024 * 1024, + "container_image_max": 10 * 1024 * 1024 * 1024, + "tmp_default": 512 * 1024 * 1024, + "tmp_max": 10 * 1024 * 1024 * 1024, + "response_max": 6 * 1024 * 1024, + }, + } + + +def _dir_bytes(path: Path) -> int: + total = 0 + for p in path.rglob("*"): + if p.is_file(): + try: + total += p.stat().st_size + except OSError: + pass + return total + + +def _zip_bytes(path: Path) -> int: + """Compute deflate-zipped size without writing to disk.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: + for p in path.rglob("*"): + if p.is_file(): + try: + zf.write(p, p.relative_to(path)) + except OSError: + pass + return buf.getbuffer().nbytes + + +def _site_packages_dir() -> Path | None: + purelib = sysconfig.get_paths().get("purelib") + return Path(purelib) if purelib and Path(purelib).exists() else None + + @app.get("/health") def health(): return {"ok": True, "loaded_modules": list(_modules.keys()), "invocations": len(_invocations)}
#TimeFunctionStart Init (ms)Duration (ms)Max RSS (MB)StatusTotal (ms)Status
No invocations yet.
${startBadge} ${h.init_duration_ms != null ? h.init_duration_ms.toFixed(2) : '—'} ${h.duration_ms.toFixed(2)}${h.max_memory_used_mb.toFixed(2)}${((h.init_duration_ms || 0) + h.duration_ms).toFixed(2)} ${statusBadge}