Compare commits

...

2 Commits

Author SHA1 Message Date
29b095d583 package size calculator 2026-05-13 17:23:25 -03:00
6652cb26e6 add tester ui, and restructure folders 2026-05-13 17:00:00 -03:00
22 changed files with 3262 additions and 1251 deletions

View File

@@ -2,9 +2,27 @@ FROM python:3.13-slim
WORKDIR /app WORKDIR /app
# Function-specific deps first. Each function carries its own requirements.txt
# (so a real AWS deploy zips the function folder verbatim). Locally, the pod
# installs the union of all of them.
COPY functions/ ./functions/
RUN set -e; \
for r in functions/*/requirements.txt; do \
[ -f "$r" ] && pip install --no-cache-dir -r "$r"; \
done
# Runner deps (FastAPI + uvicorn). Lives only in the runner pod; NOT bundled
# with any function zip for AWS.
COPY requirements.txt ./ COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY lambda_function.py invoke.py seed.py ./ # Shared modules (Lambda-Layer equivalent) and generic tooling.
COPY shared/ ./shared/
COPY invoke.py runner.py ./
CMD ["sleep", "infinity"] # uvicorn --reload restarts on .py change → every code edit produces a fresh
# cold start, matching how AWS Lambda's container lifecycle works when you
# redeploy. Watching the whole /app tree picks up function-folder edits too.
CMD ["uvicorn", "runner:app", \
"--host", "0.0.0.0", "--port", "8000", \
"--reload", "--reload-dir", "/app"]

View File

@@ -23,9 +23,16 @@ docker_build(
dockerfile='Dockerfile.lambda', dockerfile='Dockerfile.lambda',
ignore=['.git', 'def', '.venv', 'docs', '__pycache__', '.pytest_cache'], ignore=['.git', 'def', '.venv', 'docs', '__pycache__', '.pytest_cache'],
live_update=[ live_update=[
sync('../lambda_function.py', '/app/lambda_function.py'), # Whole functions/ directory — new files appear in the tester's
# function list automatically; edits to existing files cause uvicorn
# to drop them from the warm-cache so the next invoke is cold (the
# `reset_modules` endpoint also lets you force it manually).
sync('../functions', '/app/functions'),
sync('../invoke.py', '/app/invoke.py'), sync('../invoke.py', '/app/invoke.py'),
sync('../seed.py', '/app/seed.py'), sync('../seed.py', '/app/seed.py'),
# runner.py change → uvicorn --reload restarts the process → all
# function modules drop out of the cache, next invocation cold.
sync('../runner.py', '/app/runner.py'),
], ],
) )
@@ -45,7 +52,18 @@ k8s_resource('lambda', resource_deps=['minio'])
k8s_resource('docs') k8s_resource('docs')
k8s_resource('gateway', resource_deps=['docs', 'minio']) k8s_resource('gateway', resource_deps=['docs', 'minio'])
# Hot-reload gateway Caddy on Caddyfile edit. configMapGenerator uses
# disableNameSuffixHash so the Deployment template doesn't change → kustomize
# won't roll the pod on its own. This local_resource closes the loop.
local_resource(
'gateway-reload',
cmd='kubectl --context kind-eth -n eth rollout restart deployment/gateway',
deps=['k8s/base/Caddyfile'],
resource_deps=['gateway'],
auto_init=False,
)
k8s_resource( k8s_resource(
objects=['eth:namespace', 'eth-config:configmap'], objects=['eth:namespace', 'eth-config:configmap', 'gateway-config:configmap'],
new_name='infra', new_name='infra',
) )

View File

@@ -1,3 +1,12 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Invoke a function from functions/ directly via `python invoke.py`.
# Usage:
# ctrl/invoke.sh # default: first function found
# ctrl/invoke.sh lambda_function # specific function
# ctrl/invoke.sh lambda_function '{"key":"val"}' # with event payload
#
# For the same invocation through the FastAPI tester (with cold/warm + memory
# metrics) use the Lambda Tester tab at http://eth.local.ar or POST to
# http://eth.local.ar/runner/invoke/<name>.
set -euo pipefail set -euo pipefail
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python invoke.py "$@" kubectl --context kind-eth -n eth exec -i deploy/lambda -- python invoke.py "$@"

View File

@@ -4,6 +4,22 @@
} }
eth.local.ar:80 { eth.local.ar:80 {
# API surface for the local Lambda tester (FastAPI in the lambda pod).
# Path-based is fine for our own API — no SPA assumes ownership of `/`,
# and a single origin means no CORS headaches with the frontend at /.
handle_path /runner/* {
reverse_proxy lambda:8000
}
# Everything else → static docs viewer (the frontend lives here too).
handle {
reverse_proxy docs:80
}
}
docs.eth.local.ar:80 {
# Serve /docs.html for the root path; everything else (graphs, viewer.html) passes through.
rewrite / /docs.html
reverse_proxy docs:80 reverse_proxy docs:80
} }

View File

@@ -12,10 +12,14 @@ resources:
- gateway.yaml - gateway.yaml
# Generate the gateway Caddyfile ConfigMap from the standalone file. # Generate the gateway Caddyfile ConfigMap from the standalone file.
# Hash suffix is on by default — when Caddyfile changes, the ConfigMap gets # Hash suffix disabled so the name stays static — lets Tilt group it under
# a new hashed name, kustomize rewrites the Deployment volume reference, # the 'infra' resource (no "uncategorized" pill). Trade-off: pod doesn't
# and the gateway pod restarts automatically with the new config. # auto-restart on Caddyfile change; the Tiltfile has a local_resource
# 'gateway-reload' that does `kubectl rollout restart` whenever Caddyfile
# is edited, so the experience is the same in practice.
configMapGenerator: configMapGenerator:
- name: gateway-config - name: gateway-config
files: files:
- Caddyfile - Caddyfile
options:
disableNameSuffixHash: true

View File

@@ -16,8 +16,14 @@ spec:
containers: containers:
- name: lambda - name: lambda
image: eth-lambda image: eth-lambda
command: ["sleep", "infinity"] # The container runs the FastAPI runner (see runner.py + Dockerfile).
# The CMD comes from the Dockerfile (uvicorn). Container also stays
# exec-able for `bash ctrl/seed.sh` / `bash ctrl/invoke.sh` which
# spawn separate python processes alongside uvicorn.
workingDir: /app workingDir: /app
ports:
- name: http
containerPort: 8000
envFrom: envFrom:
- configMapRef: - configMapRef:
name: eth-config name: eth-config
@@ -25,14 +31,33 @@ spec:
- name: documents - name: documents
mountPath: /mnt/documents mountPath: /mnt/documents
readOnly: true readOnly: true
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 3
periodSeconds: 5
resources: resources:
requests: requests:
memory: 128Mi memory: 128Mi
cpu: 100m cpu: 100m
limits: limits:
memory: 512Mi memory: 1Gi
volumes: volumes:
- name: documents - name: documents
hostPath: hostPath:
path: /mnt/documents path: /mnt/documents
type: Directory type: Directory
---
apiVersion: v1
kind: Service
metadata:
name: lambda
namespace: eth
spec:
selector:
app: lambda
ports:
- name: http
port: 8000
targetPort: 8000

View File

@@ -1,3 +1,8 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Uploads the host's /mnt/documents tree into the in-cluster MinIO. Lives
# inside the sign_pdfs function folder because it's specific to *this*
# function's data shape (bucket "my-company-reports-bucket", prefix "2026/04/").
# Other functions will have their own seed scripts in their own folders.
set -euo pipefail set -euo pipefail
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python seed.py /mnt/documents kubectl --context kind-eth -n eth exec -i deploy/lambda -- \
python functions/sign_pdfs/seed.py /mnt/documents

1616
docs/docs.html Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,237 @@
# sign_pdfs_optimized — Walkthrough
> Production-refined fork of `sign_pdfs`. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester.
The original `sign_pdfs` is the "before" snapshot — it is left untouched. This file (`sign_pdfs_optimized/handler.py`) is the "after": same contract, same output shape, all the rough edges from the original fixed.
---
## Block 1 — Imports and sentinel
```python
import asyncio, json, os, time, uuid
import aioboto3
import aiofiles
_DONE = object()
```
Nothing new at the import level. `_DONE` is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than `None` or a string) means no PDF key will ever accidentally match it.
**Original issue fixed here:** the original put a single `_DONE` on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's `finally` block (Block 4).
---
## Block 2 — Config from event (`_cfg`)
```python
def _cfg(event: dict) -> dict:
return {
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
"page_size": int(event.get("page_size") or 1000),
"concurrency": int(event.get("concurrency") or 4),
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
}
```
**Improvement 1 — event-driven config.** The original hard-coded `bucket`, `prefix`, and `expiry` as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve `2026/04/`, `2026/05/`, or an entirely different bucket by changing only the event payload — no redeployment.
`endpoint` is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request.
`queue_max` bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers.
---
## Block 3 — Structured logging (`_log`) and EMF metrics (`_emit_emf`)
```python
def _log(event_type: str, **fields):
print(json.dumps({"event": event_type, **fields}))
def _emit_emf(metrics: dict, **dims):
print(json.dumps({
"_aws": {
"Timestamp": int(time.time() * 1000),
"CloudWatchMetrics": [{
"Namespace": "eth/sign_pdfs_optimized",
"Dimensions": [list(dims.keys())],
"Metrics": [
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
for k in metrics
],
}],
},
**dims, **metrics,
}))
```
**Improvement 2 — structured JSON logging.** The original printed nothing. `_log` emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly:
```
filter event = "complete" | stats avg(duration_ms) by bin(5m)
```
The local runner's `_extract_json_logs` picks up these lines and surfaces them in the tester's "Structured logs" panel.
**Improvement 3 — EMF metrics.** `_emit_emf` writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's `_extract_emf_metrics`) parses this and publishes real CloudWatch metrics — no `PutMetricData` API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in `Bytes``"Bytes"`, everything else → `"Count"`.
EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation.
---
## Block 4 — Producer with `try/finally`
```python
async def producer():
nonlocal pages
try:
paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
PaginationConfig={"PageSize": cfg["page_size"]},
):
pages += 1
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
await queue.put(key)
finally:
for _ in range(cfg["concurrency"]):
await queue.put(_DONE)
```
**Improvement 4 — guaranteed consumer exit.** The `finally` block runs whether the producer succeeds, raises a `ClientError`, or is cancelled. Each of the N consumers gets its own `_DONE` sentinel. Without `finally`, an exception in the producer leaves all consumers blocked on `queue.get()` forever — the handler hangs and eventually times out at 15 minutes.
**Improvement 7 — `PageSize=1000`.** S3's `ListObjectsV2` returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes.
---
## Block 5 — Consumer with per-item error handling
```python
async def consumer():
nonlocal errors
local = 0
while True:
item = await queue.get()
if item is _DONE:
return local
try:
url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": item},
ExpiresIn=cfg["expiry"],
)
line = json.dumps({"key": item, "url": url}) + "\n"
async with write_lock:
await f.write(line)
local += 1
except Exception as exc:
errors += 1
_log("presign_error", request_id=request_id, key=item, error=str(exc))
```
**Improvement 5 — per-item error isolation.** The original let a single `generate_presigned_url` failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own `try/except`. A bad key increments `errors` and logs the details; the consumer continues with the next key. The caller receives `{"count": N, "errors": M}` and can decide whether to retry the M failures.
`local` counts successful presigns per consumer. The N return values are summed after `asyncio.gather` to get the total count.
The `write_lock` serialises writes to the shared JSONL file — without it, concurrent `await f.write(line)` calls would interleave partial lines.
---
## Block 6 — N concurrent consumers (`asyncio.gather`)
```python
prod_task = asyncio.create_task(producer())
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
await prod_task
```
**Improvement 8 — concurrent consumers.** The original ran one consumer. Here `asyncio.gather` starts N consumer coroutines simultaneously, all reading from the same queue. Since `generate_presigned_url` is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with `concurrency` up to the point where the producer can't keep the queue full.
`asyncio.create_task(producer())` schedules the producer to run concurrently with the consumers rather than sequentially before them. The `await prod_task` after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task).
---
## Block 7 — Streaming manifest upload
```python
manifest_bytes = os.path.getsize(manifest_path)
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
with open(manifest_path, "rb") as f:
await s3.put_object(
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
ContentType="application/x-ndjson",
)
```
**Improvement 6 — streaming upload.** The original read the entire manifest into memory with `await f.read()` before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full `read()` doubles peak RAM usage. Passing a sync file handle as `Body` lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size.
`os.path.getsize` is called before opening the file for upload so the size is available for the EMF block even after the file is deleted.
---
## Block 8 — Cleanup, metrics, and response
```python
os.unlink(manifest_path)
result = {
"count": count,
"errors": errors,
"manifest_key": manifest_key,
"manifest_url": manifest_url,
}
response_bytes = len(json.dumps(result))
duration_ms = (time.monotonic() - t0) * 1000
_log("complete", request_id=request_id, count=count, errors=errors,
pages=pages, duration_ms=round(duration_ms, 2))
_emit_emf({
"PDFsProcessed": count, "S3ListPages": pages,
"PresignCount": count, "ManifestBytes": manifest_bytes,
"ResponseBytes": response_bytes,
}, Function="sign_pdfs_optimized")
return result
```
`os.unlink` removes the temp file after upload. `/tmp` is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) `/tmp` cap.
`response_bytes = len(json.dumps(result))` captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so `response_bytes` is typically under 300 bytes — but measuring it makes the headroom concrete.
The final `_log("complete", ...)` and `_emit_emf(...)` emit together at the end so all counts are final. In the tester, the Structured logs panel shows the `complete` event and the EMF panel shows the five metric values.
---
## Block 9 — Entrypoint
```python
def handler(event, context):
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
result = asyncio.run(_run(event, request_id))
return {"statusCode": 200, "body": json.dumps(result)}
```
`asyncio.run` creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; `asyncio.run` is the correct bridge between the sync entrypoint and the async internals. A module-level `loop = asyncio.get_event_loop()` would work on the first invocation but is deprecated and unreliable across warm invocations.
`getattr(context, "aws_request_id", ...)` falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub.
---
## Summary of improvements
| # | What changed | Why it matters |
|---|---|---|
| 1 | Config from event, env as fallback | One function serves many prefixes |
| 2 | Structured JSON logging (`_log`) | Queryable in Logs Insights; visible in tester |
| 3 | EMF metrics (`_emit_emf`) | Free CloudWatch metrics, no API call overhead |
| 4 | Producer `try/finally` with N sentinels | Consumers always exit; handler never hangs |
| 5 | Per-item `try/except` in consumer | One bad key doesn't crash the batch |
| 6 | Sync file handle as `Body` | Flat memory for any manifest size |
| 7 | `PageSize=1000` | Fewer S3 round-trips on large prefixes |
| 8 | N concurrent consumers via `asyncio.gather` | Presign throughput scales with `concurrency` |

View File

@@ -0,0 +1 @@
{}

View File

@@ -23,12 +23,14 @@ async def _run():
async def producer(): async def producer():
paginator = s3.get_paginator("list_objects_v2") paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}): try:
for obj in page.get("Contents", []) or []: async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
key = obj["Key"] for obj in page.get("Contents", []) or []:
if key.lower().endswith(".pdf"): key = obj["Key"]
await queue.put(key) if key.lower().endswith(".pdf"):
await queue.put(_DONE) await queue.put(key)
finally:
await queue.put(_DONE)
async def consumer(): async def consumer():
count = 0 count = 0

View File

@@ -0,0 +1,6 @@
# Deps for the sign_pdfs lambda. Bundled into its deployment zip when
# uploading to AWS; locally, the runner pod installs the union of all
# per-function requirements (see Dockerfile.lambda).
aioboto3>=15.0 # async S3 client used in handler.py
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
boto3>=1.40 # sync S3 client used by seed.py (data setup utility)

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1,6 @@
{
"bucket": "my-company-reports-bucket",
"prefix": "2026/04/",
"page_size": 1000,
"concurrency": 4
}

View File

@@ -0,0 +1,165 @@
"""sign_pdfs_optimized — production-refined fork of sign_pdfs.
Same contract as the original (def handler(event, context) → {"statusCode": 200, "body": ...})
but with 8 production refinements applied side-by-side for the demo:
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
env vars are fallback only. Same function can serve many prefixes.
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
ManifestBytes, ResponseBytes.
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
per consumer, not a single _DONE).
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
8. N concurrent consumers via asyncio.gather — presign throughput scales with
the concurrency knob in the event.
"""
import asyncio
import json
import os
import time
import uuid
import aioboto3
import aiofiles
_DONE = object()
def _cfg(event: dict) -> dict:
return {
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
"page_size": int(event.get("page_size") or 1000),
"concurrency": int(event.get("concurrency") or 4),
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
}
def _log(event_type: str, **fields):
print(json.dumps({"event": event_type, **fields}))
def _emit_emf(metrics: dict, **dims):
print(json.dumps({
"_aws": {
"Timestamp": int(time.time() * 1000),
"CloudWatchMetrics": [{
"Namespace": "eth/sign_pdfs_optimized",
"Dimensions": [list(dims.keys())],
"Metrics": [
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
for k in metrics
],
}],
},
**dims, **metrics,
}))
async def _run(event: dict, request_id: str):
cfg = _cfg(event)
_log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"],
page_size=cfg["page_size"], concurrency=cfg["concurrency"])
t0 = time.monotonic()
pages = 0
errors = 0
session = aioboto3.Session()
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
async def producer():
nonlocal pages
try:
paginator = s3.get_paginator("list_objects_v2")
async for page in paginator.paginate(
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
PaginationConfig={"PageSize": cfg["page_size"]},
):
pages += 1
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
await queue.put(key)
finally:
# one sentinel per consumer so each gather() task exits cleanly
for _ in range(cfg["concurrency"]):
await queue.put(_DONE)
write_lock = asyncio.Lock()
async with aiofiles.open(manifest_path, "w") as f:
async def consumer():
nonlocal errors
local = 0
while True:
item = await queue.get()
if item is _DONE:
return local
try:
url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": item},
ExpiresIn=cfg["expiry"],
)
line = json.dumps({"key": item, "url": url}) + "\n"
async with write_lock:
await f.write(line)
local += 1
except Exception as exc:
errors += 1
_log("presign_error", request_id=request_id, key=item, error=str(exc))
prod_task = asyncio.create_task(producer())
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
await prod_task
count = sum(counts)
manifest_bytes = os.path.getsize(manifest_path)
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
# the whole manifest in memory like `body = await f.read()` would.
with open(manifest_path, "rb") as f:
await s3.put_object(
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
ContentType="application/x-ndjson",
)
manifest_url = await s3.generate_presigned_url(
"get_object",
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
ExpiresIn=cfg["expiry"],
)
os.unlink(manifest_path)
result = {
"count": count,
"errors": errors,
"manifest_key": manifest_key,
"manifest_url": manifest_url,
}
response_bytes = len(json.dumps(result))
duration_ms = (time.monotonic() - t0) * 1000
_log("complete", request_id=request_id, count=count, errors=errors,
pages=pages, duration_ms=round(duration_ms, 2))
_emit_emf({
"PDFsProcessed": count, "S3ListPages": pages,
"PresignCount": count, "ManifestBytes": manifest_bytes,
"ResponseBytes": response_bytes,
}, Function="sign_pdfs_optimized")
return result
def handler(event, context):
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
result = asyncio.run(_run(event, request_id))
return {"statusCode": 200, "body": json.dumps(result)}

View File

@@ -0,0 +1,3 @@
# Same deps as sign_pdfs/ — this is the "after" demo fork, same dep footprint.
aioboto3>=15.0 # async S3 client
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp

View File

@@ -1,15 +1,65 @@
"""Shared CLI invoker for any function in functions/.
Usage (inside the lambda pod, via `bash ctrl/invoke.sh [name [event_json]]`):
python invoke.py # invokes the only function, or the first found
python invoke.py lambda_function # specific function
python invoke.py lambda_function '{}' # with event payload
"""
import importlib.util
import json import json
import os import os
import sys
from pathlib import Path
# Defaults match the in-cluster configmap so behavior is identical whether
# this script is invoked directly or via the FastAPI runner. `setdefault`
# never overrides an existing env var — the configmap wins in the pod.
os.environ.setdefault("BUCKET_NAME", "my-company-reports-bucket") os.environ.setdefault("BUCKET_NAME", "my-company-reports-bucket")
os.environ.setdefault("PREFIX", "2026/04/") os.environ.setdefault("PREFIX", "2026/04/")
os.environ.setdefault("S3_ENDPOINT_URL", "http://localhost:9000") os.environ.setdefault("S3_ENDPOINT_URL", "http://minio:9000")
os.environ.setdefault("AWS_ACCESS_KEY_ID", "minioadmin") os.environ.setdefault("AWS_ACCESS_KEY_ID", "minioadmin")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "minioadmin") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "minioadmin")
os.environ.setdefault("AWS_REGION", "us-east-1") os.environ.setdefault("AWS_REGION", "us-east-1")
from lambda_function import handler # noqa: E402 REPO_ROOT = Path(__file__).parent
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", str(REPO_ROOT / "functions")))
SHARED_DIR = REPO_ROOT / "shared"
# Make shared/ importable from any handler ("from shared import ..."). Matches
# how a Lambda Layer would expose code on PYTHONPATH at runtime.
if SHARED_DIR.exists() and str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
def _pick_default_function() -> str:
candidates = sorted(
d.name for d in FUNCTIONS_DIR.iterdir()
if d.is_dir() and not d.name.startswith("_") and (d / "handler.py").exists()
)
if not candidates:
print(f"no function folders with handler.py in {FUNCTIONS_DIR}", file=sys.stderr)
sys.exit(2)
return candidates[0]
def _load(name: str):
path = FUNCTIONS_DIR / name / "handler.py"
if not path.exists():
print(f"function not found: {path}", file=sys.stderr)
sys.exit(2)
spec = importlib.util.spec_from_file_location(f"functions.{name}.handler", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if not hasattr(module, "handler"):
print(f"{path} has no handler(event, context)", file=sys.stderr)
sys.exit(2)
return module
if __name__ == "__main__": if __name__ == "__main__":
response = handler({}, None) name = sys.argv[1] if len(sys.argv) > 1 else _pick_default_function()
event = json.loads(sys.argv[2]) if len(sys.argv) > 2 else {}
module = _load(name)
response = module.handler(event, None)
print(json.dumps(response, indent=2)) print(json.dumps(response, indent=2))

View File

@@ -1,3 +1,5 @@
aioboto3>=15.0 # Root requirements = runner + invoker deps only. Per-function deps live
aiofiles>=23.2 # alongside each function (functions/<name>/requirements.txt) so they're
boto3>=1.40 # bundled with the function's deployment zip for AWS.
fastapi>=0.115
uvicorn[standard]>=0.32

432
runner.py Normal file
View File

@@ -0,0 +1,432 @@
"""Local Lambda runner — FastAPI wrapper that invokes any `handler(event, context)`
file in /app and reports AWS-equivalent metrics. Nothing in this file is touched
by the lambda function itself; functions stay verbatim-uploadable to AWS.
Features that are scaffolded now and "light up" later when matching improvements
land in the function:
- event payload pass-through (improvement #1: BUCKET/PREFIX from event)
- structured JSON log capture (improvement #2: JSON logging to stdout)
- EMF metric extraction (improvement #3: CloudWatch EMF embedded metrics)
Until the function emits those, the corresponding output fields are empty.
"""
import asyncio
import importlib.util
import io
import json
import math
import os
import resource
import subprocess
import sys
import sysconfig
import time
import traceback
import uuid
import zipfile
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", "/app/functions"))
SHARED_DIR = Path(os.environ.get("SHARED_DIR", "/app/shared"))
MAX_INVOCATIONS = int(os.environ.get("RUNNER_MAX_INVOCATIONS", "200"))
# Make shared/ importable for any function. Mirrors AWS Lambda Layer behavior
# (layer code is added to PYTHONPATH for all functions that attach it).
if SHARED_DIR.exists():
_repo_root = str(SHARED_DIR.parent)
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
app = FastAPI(title="Lambda Local Runner")
_modules: dict = {} # name -> imported module (cache; presence = warm)
_module_deps: dict = {} # name -> set of sys.modules keys added during this function's init
_invocations: list[dict] = [] # newest last; capped at MAX_INVOCATIONS
class LambdaContext:
"""Minimal stand-in for the AWS Lambda context object. The function file
doesn't use any of this today, but the shape is right for when improvements
add `context.aws_request_id` to structured logs etc."""
def __init__(self, request_id: str, function_name: str, memory_mb: int, timeout_ms: int):
self.aws_request_id = request_id
self.function_name = function_name
self.function_version = "$LATEST"
self.invoked_function_arn = f"arn:aws:lambda:local:000000000000:function:{function_name}"
self.memory_limit_in_mb = memory_mb
self.log_group_name = f"/aws/lambda/{function_name}"
self.log_stream_name = f"local/{time.strftime('%Y/%m/%d')}/[$LATEST]{request_id}"
self._deadline_ms = time.monotonic() * 1000 + timeout_ms
def get_remaining_time_in_millis(self) -> int:
return max(0, int(self._deadline_ms - time.monotonic() * 1000))
class InvokeRequest(BaseModel):
event: dict = Field(default_factory=dict)
# AWS Lambda memory sizes: 128, 256, 512, 1024, 1536, 2048, 3008, 5120, 10240
memory_mb: int = 128
# AWS Lambda timeout: 1-900 seconds. Locally we record it but don't kill
# the handler (matches "function works verbatim" — no signal interruption).
timeout_ms: int = 30_000
class ScriptRequest(BaseModel):
args: list[str] = Field(default_factory=list)
@app.get("/functions")
def list_functions():
"""Scan FUNCTIONS_DIR for subfolders containing a handler.py with
`def handler(event, context):`. Each function lives in its own folder
(matches AWS Lambda's deployment-package shape)."""
funcs: list[dict] = []
if not FUNCTIONS_DIR.exists():
return {"functions": [], "functions_dir": str(FUNCTIONS_DIR), "error": "directory not found"}
for d in sorted(FUNCTIONS_DIR.iterdir()):
if not d.is_dir() or d.name.startswith("_"):
continue
handler = d / "handler.py"
if not handler.exists():
continue
try:
text = handler.read_text()
except Exception:
continue
if "def handler(event, context)" not in text and "def handler(event,context)" not in text:
continue
# Discover sample events (events/*.json) so the UI can populate a dropdown.
events = sorted(p.name for p in (d / "events").glob("*.json")) if (d / "events").is_dir() else []
funcs.append({"name": d.name, "events": events})
return {"functions": funcs, "functions_dir": str(FUNCTIONS_DIR)}
@app.get("/functions/{name}/events/{filename}")
def get_event(name: str, filename: str):
"""Serve a sample event file so the UI can preview/select it."""
path = FUNCTIONS_DIR / name / "events" / filename
if not path.exists() or not path.is_file():
raise HTTPException(status_code=404, detail="event file not found")
try:
return json.loads(path.read_text())
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"invalid JSON in {path}: {e}")
@app.post("/invoke/{name}")
def invoke(name: str, req: InvokeRequest):
"""Invoke a handler. Sync def so FastAPI runs us in a thread — that lets
the function call `asyncio.run(...)` internally without nested-loop errors
(which is how the current handler.py works)."""
target = FUNCTIONS_DIR / name / "handler.py"
if not target.exists():
raise HTTPException(status_code=404, detail=f"{target} not found")
invocation_id = str(uuid.uuid4())
cold_start = name not in _modules
init_duration_ms = None
record: dict = {
"invocation_id": invocation_id,
"function": name,
"timestamp": time.time(),
"event": req.event,
"result": None,
"error": None,
"stdout": "",
"stderr": "",
"structured_logs": [],
"emf_metrics": [],
"metrics": {
"cold_start": cold_start,
"init_duration_ms": None,
"duration_ms": 0.0,
"billed_duration_ms": 0,
"memory_size_mb": req.memory_mb,
"max_memory_used_mb": 0.0,
},
}
# Cold-start: import the module and time the import. This matches AWS's
# "Init Duration" — time to load module-level code (imports + module-scope
# statements). On warm invocations this whole block is skipped.
if cold_start:
spec = importlib.util.spec_from_file_location(
f"functions.{name}.handler", target,
)
module = importlib.util.module_from_spec(spec)
# Snapshot sys.modules so /reset can pop the transitive deps this function
# pulled in (aioboto3 → aiobotocore → botocore, etc.). Without this the
# second cold start is fake — heavy imports stay cached in the long-running
# uvicorn process, and Force Cold reports unrealistically small numbers.
sys_modules_before = set(sys.modules)
t0 = time.monotonic()
try:
spec.loader.exec_module(module)
except Exception as e:
init_duration_ms = (time.monotonic() - t0) * 1000
record["error"] = _format_exception(e)
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
return _record(record)
init_duration_ms = (time.monotonic() - t0) * 1000
_modules[name] = module
_module_deps[name] = set(sys.modules) - sys_modules_before
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
module = _modules[name]
if not hasattr(module, "handler"):
raise HTTPException(status_code=400, detail=f"{name}.py has no handler() function")
context = LambdaContext(
request_id=invocation_id,
function_name=name,
memory_mb=req.memory_mb,
timeout_ms=req.timeout_ms,
)
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
t_handler = time.monotonic()
try:
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
result = module.handler(req.event, context)
# Defensive: if a future async handler ever returns a coroutine
# (AWS doesn't support that natively, but we might), run it.
if asyncio.iscoroutine(result):
result = asyncio.run(result)
record["result"] = result
except Exception as e:
record["error"] = _format_exception(e)
duration_ms = (time.monotonic() - t_handler) * 1000
# ru_maxrss: kilobytes on Linux, bytes on macOS. We run on Linux in kind.
rusage = resource.getrusage(resource.RUSAGE_SELF)
max_memory_mb = rusage.ru_maxrss / 1024
record["stdout"] = stdout_buf.getvalue()
record["stderr"] = stderr_buf.getvalue()
record["structured_logs"] = _extract_json_logs(record["stdout"])
record["emf_metrics"] = _extract_emf_metrics(record["stdout"])
record["metrics"]["duration_ms"] = round(duration_ms, 2)
record["metrics"]["billed_duration_ms"] = int(math.ceil(duration_ms))
record["metrics"]["max_memory_used_mb"] = round(max_memory_mb, 2)
return _record(record)
@app.get("/invocations")
def list_invocations(limit: int = 50):
"""Index of past invocations, newest first. Lightweight summary only —
use /invocations/{id} for the full record."""
items = []
for r in reversed(_invocations[-limit:]):
items.append({
"invocation_id": r["invocation_id"],
"function": r["function"],
"timestamp": r["timestamp"],
"cold_start": r["metrics"]["cold_start"],
"duration_ms": r["metrics"]["duration_ms"],
"init_duration_ms": r["metrics"]["init_duration_ms"],
"max_memory_used_mb": r["metrics"]["max_memory_used_mb"],
"ok": r["error"] is None,
})
return {"invocations": items, "total": len(_invocations)}
@app.get("/invocations/{invocation_id}")
def get_invocation(invocation_id: str):
for r in _invocations:
if r["invocation_id"] == invocation_id:
return r
raise HTTPException(status_code=404, detail="invocation not found")
@app.delete("/invocations")
def clear_invocations():
n = len(_invocations)
_invocations.clear()
return {"cleared": n}
@app.post("/reset")
def reset_modules():
"""Clear the module cache AND the transitive imports each function pulled in
during its init, so the next invocation pays a realistic cold-start cost
(re-importing aioboto3 → aiobotocore → botocore from disk, not a no-op
against an already-warm uvicorn process)."""
cleared = list(_modules.keys())
popped = 0
for name in cleared:
sys.modules.pop(f"functions.{name}.handler", None)
for dep in _module_deps.pop(name, ()):
if sys.modules.pop(dep, None) is not None:
popped += 1
_modules.clear()
return {"cleared": cleared, "transitive_modules_popped": popped}
@app.get("/functions/{name}/scripts")
def list_scripts(name: str):
"""List support scripts for a function — any .py file that isn't handler.py."""
func_dir = FUNCTIONS_DIR / name
if not func_dir.is_dir():
raise HTTPException(status_code=404, detail=f"function {name!r} not found")
scripts = [
p.name for p in sorted(func_dir.glob("*.py"))
if p.name not in ("handler.py", "__init__.py")
]
return {"scripts": scripts, "function": name}
@app.post("/scripts/{fn_name}/{script_name}")
def run_script(fn_name: str, script_name: str, req: ScriptRequest):
"""Run a support script from functions/<fn_name>/<script_name> with optional args."""
if ".." in script_name or "/" in script_name:
raise HTTPException(status_code=400, detail="invalid script name")
if not script_name.endswith(".py"):
raise HTTPException(status_code=400, detail="only .py scripts allowed")
script_path = FUNCTIONS_DIR / fn_name / script_name
if not script_path.exists():
raise HTTPException(status_code=404, detail=f"{script_path} not found")
cmd = [sys.executable, str(script_path)] + list(req.args)
t0 = time.monotonic()
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
except subprocess.TimeoutExpired:
return {"returncode": -1, "stdout": "", "stderr": "timed out after 300 s",
"duration_ms": 300_000.0}
return {
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"duration_ms": round((time.monotonic() - t0) * 1000, 2),
}
@app.get("/packaging")
def packaging():
"""Static sizing report — what each function would ship as a Lambda deployment zip,
what the shared layer would weigh, what the largest installed deps look like.
All computed against the live pod filesystem so numbers are real, not extrapolated."""
funcs = []
for d in sorted(FUNCTIONS_DIR.iterdir()):
if not d.is_dir() or d.name.startswith("_"):
continue
if not (d / "handler.py").exists():
continue
funcs.append({
"name": d.name,
"handler_bytes": (d / "handler.py").stat().st_size,
"folder_bytes": _dir_bytes(d),
"folder_zip_bytes": _zip_bytes(d),
})
site_packages = _site_packages_dir()
deps: list[dict] = []
if site_packages:
for child in sorted(site_packages.iterdir()):
if not child.is_dir():
continue
if child.name.startswith("_") or child.name.endswith(".dist-info"):
continue
b = _dir_bytes(child)
if b > 50_000:
deps.append({"name": child.name, "bytes": b})
deps.sort(key=lambda x: -x["bytes"])
shared_bytes = _dir_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
shared_zip = _zip_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
return {
"functions": funcs,
"dependencies": deps[:25],
"dependencies_total_bytes": sum(d["bytes"] for d in deps),
"shared_layer": {"bytes": shared_bytes, "zip_bytes": shared_zip},
"limits": {
"zip_upload_max": 50 * 1024 * 1024,
"unzipped_max": 250 * 1024 * 1024,
"container_image_max": 10 * 1024 * 1024 * 1024,
"tmp_default": 512 * 1024 * 1024,
"tmp_max": 10 * 1024 * 1024 * 1024,
"response_max": 6 * 1024 * 1024,
},
}
def _dir_bytes(path: Path) -> int:
total = 0
for p in path.rglob("*"):
if p.is_file():
try:
total += p.stat().st_size
except OSError:
pass
return total
def _zip_bytes(path: Path) -> int:
"""Compute deflate-zipped size without writing to disk."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for p in path.rglob("*"):
if p.is_file():
try:
zf.write(p, p.relative_to(path))
except OSError:
pass
return buf.getbuffer().nbytes
def _site_packages_dir() -> Path | None:
purelib = sysconfig.get_paths().get("purelib")
return Path(purelib) if purelib and Path(purelib).exists() else None
@app.get("/health")
def health():
return {"ok": True, "loaded_modules": list(_modules.keys()), "invocations": len(_invocations)}
def _format_exception(e: BaseException) -> dict:
return {
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc(),
}
def _extract_json_logs(stdout_text: str) -> list[dict]:
"""Parse JSON-per-line structured logs out of stdout. Fails silently —
until the function emits structured logs (improvement #2), this returns []."""
logs: list[dict] = []
for line in stdout_text.splitlines():
line = line.strip()
if not (line.startswith("{") and line.endswith("}")):
continue
try:
logs.append(json.loads(line))
except json.JSONDecodeError:
continue
return logs
def _extract_emf_metrics(stdout_text: str) -> list[dict]:
"""Parse CloudWatch EMF metric records out of stdout. EMF format:
{"_aws": {"CloudWatchMetrics": [...], "Timestamp": ...}, "<metric>": value, ...}
Fails silently — until the function emits EMF (improvement #3), returns []."""
metrics: list[dict] = []
for entry in _extract_json_logs(stdout_text):
aws = entry.get("_aws")
if isinstance(aws, dict) and "CloudWatchMetrics" in aws:
metrics.append(entry)
return metrics
def _record(rec: dict) -> dict:
_invocations.append(rec)
if len(_invocations) > MAX_INVOCATIONS:
del _invocations[: len(_invocations) - MAX_INVOCATIONS]
return rec

21
shared/README.md Normal file
View File

@@ -0,0 +1,21 @@
# shared/
Cross-function Python modules — the local equivalent of an AWS Lambda Layer.
Anything in this directory is available to every function under
`functions/<name>/handler.py` as a regular import:
```python
# functions/sign_pdfs/handler.py
from shared import common_utils
```
The `shared/` directory is added to `sys.path` by the runner. For an AWS
deploy, you'd either:
1. Package it as a real Lambda Layer (preferred), and reference the layer ARN
from each function's deploy spec, **or**
2. Vendor a copy into each function's deployment zip (simpler, less DRY).
Currently empty — no cross-function code yet. First candidate would probably
be a structured-logging helper once multiple functions need it.