Compare commits
2 Commits
7c5aa14409
...
29b095d583
| Author | SHA1 | Date | |
|---|---|---|---|
| 29b095d583 | |||
| 6652cb26e6 |
@@ -2,9 +2,27 @@ FROM python:3.13-slim
|
|||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Function-specific deps first. Each function carries its own requirements.txt
|
||||||
|
# (so a real AWS deploy zips the function folder verbatim). Locally, the pod
|
||||||
|
# installs the union of all of them.
|
||||||
|
COPY functions/ ./functions/
|
||||||
|
RUN set -e; \
|
||||||
|
for r in functions/*/requirements.txt; do \
|
||||||
|
[ -f "$r" ] && pip install --no-cache-dir -r "$r"; \
|
||||||
|
done
|
||||||
|
|
||||||
|
# Runner deps (FastAPI + uvicorn). Lives only in the runner pod; NOT bundled
|
||||||
|
# with any function zip for AWS.
|
||||||
COPY requirements.txt ./
|
COPY requirements.txt ./
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
COPY lambda_function.py invoke.py seed.py ./
|
# Shared modules (Lambda-Layer equivalent) and generic tooling.
|
||||||
|
COPY shared/ ./shared/
|
||||||
|
COPY invoke.py runner.py ./
|
||||||
|
|
||||||
CMD ["sleep", "infinity"]
|
# uvicorn --reload restarts on .py change → every code edit produces a fresh
|
||||||
|
# cold start, matching how AWS Lambda's container lifecycle works when you
|
||||||
|
# redeploy. Watching the whole /app tree picks up function-folder edits too.
|
||||||
|
CMD ["uvicorn", "runner:app", \
|
||||||
|
"--host", "0.0.0.0", "--port", "8000", \
|
||||||
|
"--reload", "--reload-dir", "/app"]
|
||||||
|
|||||||
@@ -23,9 +23,16 @@ docker_build(
|
|||||||
dockerfile='Dockerfile.lambda',
|
dockerfile='Dockerfile.lambda',
|
||||||
ignore=['.git', 'def', '.venv', 'docs', '__pycache__', '.pytest_cache'],
|
ignore=['.git', 'def', '.venv', 'docs', '__pycache__', '.pytest_cache'],
|
||||||
live_update=[
|
live_update=[
|
||||||
sync('../lambda_function.py', '/app/lambda_function.py'),
|
# Whole functions/ directory — new files appear in the tester's
|
||||||
|
# function list automatically; edits to existing files cause uvicorn
|
||||||
|
# to drop them from the warm-cache so the next invoke is cold (the
|
||||||
|
# `reset_modules` endpoint also lets you force it manually).
|
||||||
|
sync('../functions', '/app/functions'),
|
||||||
sync('../invoke.py', '/app/invoke.py'),
|
sync('../invoke.py', '/app/invoke.py'),
|
||||||
sync('../seed.py', '/app/seed.py'),
|
sync('../seed.py', '/app/seed.py'),
|
||||||
|
# runner.py change → uvicorn --reload restarts the process → all
|
||||||
|
# function modules drop out of the cache, next invocation cold.
|
||||||
|
sync('../runner.py', '/app/runner.py'),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -45,7 +52,18 @@ k8s_resource('lambda', resource_deps=['minio'])
|
|||||||
k8s_resource('docs')
|
k8s_resource('docs')
|
||||||
k8s_resource('gateway', resource_deps=['docs', 'minio'])
|
k8s_resource('gateway', resource_deps=['docs', 'minio'])
|
||||||
|
|
||||||
|
# Hot-reload gateway Caddy on Caddyfile edit. configMapGenerator uses
|
||||||
|
# disableNameSuffixHash so the Deployment template doesn't change → kustomize
|
||||||
|
# won't roll the pod on its own. This local_resource closes the loop.
|
||||||
|
local_resource(
|
||||||
|
'gateway-reload',
|
||||||
|
cmd='kubectl --context kind-eth -n eth rollout restart deployment/gateway',
|
||||||
|
deps=['k8s/base/Caddyfile'],
|
||||||
|
resource_deps=['gateway'],
|
||||||
|
auto_init=False,
|
||||||
|
)
|
||||||
|
|
||||||
k8s_resource(
|
k8s_resource(
|
||||||
objects=['eth:namespace', 'eth-config:configmap'],
|
objects=['eth:namespace', 'eth-config:configmap', 'gateway-config:configmap'],
|
||||||
new_name='infra',
|
new_name='infra',
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,3 +1,12 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
# Invoke a function from functions/ directly via `python invoke.py`.
|
||||||
|
# Usage:
|
||||||
|
# ctrl/invoke.sh # default: first function found
|
||||||
|
# ctrl/invoke.sh lambda_function # specific function
|
||||||
|
# ctrl/invoke.sh lambda_function '{"key":"val"}' # with event payload
|
||||||
|
#
|
||||||
|
# For the same invocation through the FastAPI tester (with cold/warm + memory
|
||||||
|
# metrics) use the Lambda Tester tab at http://eth.local.ar or POST to
|
||||||
|
# http://eth.local.ar/runner/invoke/<name>.
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python invoke.py "$@"
|
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python invoke.py "$@"
|
||||||
|
|||||||
@@ -4,6 +4,22 @@
|
|||||||
}
|
}
|
||||||
|
|
||||||
eth.local.ar:80 {
|
eth.local.ar:80 {
|
||||||
|
# API surface for the local Lambda tester (FastAPI in the lambda pod).
|
||||||
|
# Path-based is fine for our own API — no SPA assumes ownership of `/`,
|
||||||
|
# and a single origin means no CORS headaches with the frontend at /.
|
||||||
|
handle_path /runner/* {
|
||||||
|
reverse_proxy lambda:8000
|
||||||
|
}
|
||||||
|
|
||||||
|
# Everything else → static docs viewer (the frontend lives here too).
|
||||||
|
handle {
|
||||||
|
reverse_proxy docs:80
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
docs.eth.local.ar:80 {
|
||||||
|
# Serve /docs.html for the root path; everything else (graphs, viewer.html) passes through.
|
||||||
|
rewrite / /docs.html
|
||||||
reverse_proxy docs:80
|
reverse_proxy docs:80
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,10 +12,14 @@ resources:
|
|||||||
- gateway.yaml
|
- gateway.yaml
|
||||||
|
|
||||||
# Generate the gateway Caddyfile ConfigMap from the standalone file.
|
# Generate the gateway Caddyfile ConfigMap from the standalone file.
|
||||||
# Hash suffix is on by default — when Caddyfile changes, the ConfigMap gets
|
# Hash suffix disabled so the name stays static — lets Tilt group it under
|
||||||
# a new hashed name, kustomize rewrites the Deployment volume reference,
|
# the 'infra' resource (no "uncategorized" pill). Trade-off: pod doesn't
|
||||||
# and the gateway pod restarts automatically with the new config.
|
# auto-restart on Caddyfile change; the Tiltfile has a local_resource
|
||||||
|
# 'gateway-reload' that does `kubectl rollout restart` whenever Caddyfile
|
||||||
|
# is edited, so the experience is the same in practice.
|
||||||
configMapGenerator:
|
configMapGenerator:
|
||||||
- name: gateway-config
|
- name: gateway-config
|
||||||
files:
|
files:
|
||||||
- Caddyfile
|
- Caddyfile
|
||||||
|
options:
|
||||||
|
disableNameSuffixHash: true
|
||||||
|
|||||||
@@ -16,8 +16,14 @@ spec:
|
|||||||
containers:
|
containers:
|
||||||
- name: lambda
|
- name: lambda
|
||||||
image: eth-lambda
|
image: eth-lambda
|
||||||
command: ["sleep", "infinity"]
|
# The container runs the FastAPI runner (see runner.py + Dockerfile).
|
||||||
|
# The CMD comes from the Dockerfile (uvicorn). Container also stays
|
||||||
|
# exec-able for `bash ctrl/seed.sh` / `bash ctrl/invoke.sh` which
|
||||||
|
# spawn separate python processes alongside uvicorn.
|
||||||
workingDir: /app
|
workingDir: /app
|
||||||
|
ports:
|
||||||
|
- name: http
|
||||||
|
containerPort: 8000
|
||||||
envFrom:
|
envFrom:
|
||||||
- configMapRef:
|
- configMapRef:
|
||||||
name: eth-config
|
name: eth-config
|
||||||
@@ -25,14 +31,33 @@ spec:
|
|||||||
- name: documents
|
- name: documents
|
||||||
mountPath: /mnt/documents
|
mountPath: /mnt/documents
|
||||||
readOnly: true
|
readOnly: true
|
||||||
|
readinessProbe:
|
||||||
|
httpGet:
|
||||||
|
path: /health
|
||||||
|
port: 8000
|
||||||
|
initialDelaySeconds: 3
|
||||||
|
periodSeconds: 5
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
memory: 128Mi
|
memory: 128Mi
|
||||||
cpu: 100m
|
cpu: 100m
|
||||||
limits:
|
limits:
|
||||||
memory: 512Mi
|
memory: 1Gi
|
||||||
volumes:
|
volumes:
|
||||||
- name: documents
|
- name: documents
|
||||||
hostPath:
|
hostPath:
|
||||||
path: /mnt/documents
|
path: /mnt/documents
|
||||||
type: Directory
|
type: Directory
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: lambda
|
||||||
|
namespace: eth
|
||||||
|
spec:
|
||||||
|
selector:
|
||||||
|
app: lambda
|
||||||
|
ports:
|
||||||
|
- name: http
|
||||||
|
port: 8000
|
||||||
|
targetPort: 8000
|
||||||
|
|||||||
@@ -1,3 +1,8 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
# Uploads the host's /mnt/documents tree into the in-cluster MinIO. Lives
|
||||||
|
# inside the sign_pdfs function folder because it's specific to *this*
|
||||||
|
# function's data shape (bucket "my-company-reports-bucket", prefix "2026/04/").
|
||||||
|
# Other functions will have their own seed scripts in their own folders.
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python seed.py /mnt/documents
|
kubectl --context kind-eth -n eth exec -i deploy/lambda -- \
|
||||||
|
python functions/sign_pdfs/seed.py /mnt/documents
|
||||||
|
|||||||
1616
docs/docs.html
Normal file
1616
docs/docs.html
Normal file
File diff suppressed because it is too large
Load Diff
1832
docs/index.html
1832
docs/index.html
File diff suppressed because it is too large
Load Diff
237
docs/lambdas-md/lambda-20-sign-pdfs-optimized.md
Normal file
237
docs/lambdas-md/lambda-20-sign-pdfs-optimized.md
Normal file
@@ -0,0 +1,237 @@
|
|||||||
|
# sign_pdfs_optimized — Walkthrough
|
||||||
|
|
||||||
|
> Production-refined fork of `sign_pdfs`. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester.
|
||||||
|
|
||||||
|
The original `sign_pdfs` is the "before" snapshot — it is left untouched. This file (`sign_pdfs_optimized/handler.py`) is the "after": same contract, same output shape, all the rough edges from the original fixed.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 1 — Imports and sentinel
|
||||||
|
|
||||||
|
```python
|
||||||
|
import asyncio, json, os, time, uuid
|
||||||
|
import aioboto3
|
||||||
|
import aiofiles
|
||||||
|
|
||||||
|
_DONE = object()
|
||||||
|
```
|
||||||
|
|
||||||
|
Nothing new at the import level. `_DONE` is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than `None` or a string) means no PDF key will ever accidentally match it.
|
||||||
|
|
||||||
|
**Original issue fixed here:** the original put a single `_DONE` on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's `finally` block (Block 4).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 2 — Config from event (`_cfg`)
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _cfg(event: dict) -> dict:
|
||||||
|
return {
|
||||||
|
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
|
||||||
|
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
|
||||||
|
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
|
||||||
|
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
||||||
|
"page_size": int(event.get("page_size") or 1000),
|
||||||
|
"concurrency": int(event.get("concurrency") or 4),
|
||||||
|
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 1 — event-driven config.** The original hard-coded `bucket`, `prefix`, and `expiry` as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve `2026/04/`, `2026/05/`, or an entirely different bucket by changing only the event payload — no redeployment.
|
||||||
|
|
||||||
|
`endpoint` is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request.
|
||||||
|
|
||||||
|
`queue_max` bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 3 — Structured logging (`_log`) and EMF metrics (`_emit_emf`)
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _log(event_type: str, **fields):
|
||||||
|
print(json.dumps({"event": event_type, **fields}))
|
||||||
|
|
||||||
|
def _emit_emf(metrics: dict, **dims):
|
||||||
|
print(json.dumps({
|
||||||
|
"_aws": {
|
||||||
|
"Timestamp": int(time.time() * 1000),
|
||||||
|
"CloudWatchMetrics": [{
|
||||||
|
"Namespace": "eth/sign_pdfs_optimized",
|
||||||
|
"Dimensions": [list(dims.keys())],
|
||||||
|
"Metrics": [
|
||||||
|
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
|
||||||
|
for k in metrics
|
||||||
|
],
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
**dims, **metrics,
|
||||||
|
}))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 2 — structured JSON logging.** The original printed nothing. `_log` emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly:
|
||||||
|
|
||||||
|
```
|
||||||
|
filter event = "complete" | stats avg(duration_ms) by bin(5m)
|
||||||
|
```
|
||||||
|
|
||||||
|
The local runner's `_extract_json_logs` picks up these lines and surfaces them in the tester's "Structured logs" panel.
|
||||||
|
|
||||||
|
**Improvement 3 — EMF metrics.** `_emit_emf` writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's `_extract_emf_metrics`) parses this and publishes real CloudWatch metrics — no `PutMetricData` API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in `Bytes` → `"Bytes"`, everything else → `"Count"`.
|
||||||
|
|
||||||
|
EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 4 — Producer with `try/finally`
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def producer():
|
||||||
|
nonlocal pages
|
||||||
|
try:
|
||||||
|
paginator = s3.get_paginator("list_objects_v2")
|
||||||
|
async for page in paginator.paginate(
|
||||||
|
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
|
||||||
|
PaginationConfig={"PageSize": cfg["page_size"]},
|
||||||
|
):
|
||||||
|
pages += 1
|
||||||
|
for obj in page.get("Contents", []) or []:
|
||||||
|
key = obj["Key"]
|
||||||
|
if key.lower().endswith(".pdf"):
|
||||||
|
await queue.put(key)
|
||||||
|
finally:
|
||||||
|
for _ in range(cfg["concurrency"]):
|
||||||
|
await queue.put(_DONE)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 4 — guaranteed consumer exit.** The `finally` block runs whether the producer succeeds, raises a `ClientError`, or is cancelled. Each of the N consumers gets its own `_DONE` sentinel. Without `finally`, an exception in the producer leaves all consumers blocked on `queue.get()` forever — the handler hangs and eventually times out at 15 minutes.
|
||||||
|
|
||||||
|
**Improvement 7 — `PageSize=1000`.** S3's `ListObjectsV2` returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 5 — Consumer with per-item error handling
|
||||||
|
|
||||||
|
```python
|
||||||
|
async def consumer():
|
||||||
|
nonlocal errors
|
||||||
|
local = 0
|
||||||
|
while True:
|
||||||
|
item = await queue.get()
|
||||||
|
if item is _DONE:
|
||||||
|
return local
|
||||||
|
try:
|
||||||
|
url = await s3.generate_presigned_url(
|
||||||
|
"get_object",
|
||||||
|
Params={"Bucket": cfg["bucket"], "Key": item},
|
||||||
|
ExpiresIn=cfg["expiry"],
|
||||||
|
)
|
||||||
|
line = json.dumps({"key": item, "url": url}) + "\n"
|
||||||
|
async with write_lock:
|
||||||
|
await f.write(line)
|
||||||
|
local += 1
|
||||||
|
except Exception as exc:
|
||||||
|
errors += 1
|
||||||
|
_log("presign_error", request_id=request_id, key=item, error=str(exc))
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 5 — per-item error isolation.** The original let a single `generate_presigned_url` failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own `try/except`. A bad key increments `errors` and logs the details; the consumer continues with the next key. The caller receives `{"count": N, "errors": M}` and can decide whether to retry the M failures.
|
||||||
|
|
||||||
|
`local` counts successful presigns per consumer. The N return values are summed after `asyncio.gather` to get the total count.
|
||||||
|
|
||||||
|
The `write_lock` serialises writes to the shared JSONL file — without it, concurrent `await f.write(line)` calls would interleave partial lines.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 6 — N concurrent consumers (`asyncio.gather`)
|
||||||
|
|
||||||
|
```python
|
||||||
|
prod_task = asyncio.create_task(producer())
|
||||||
|
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
|
||||||
|
await prod_task
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 8 — concurrent consumers.** The original ran one consumer. Here `asyncio.gather` starts N consumer coroutines simultaneously, all reading from the same queue. Since `generate_presigned_url` is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with `concurrency` up to the point where the producer can't keep the queue full.
|
||||||
|
|
||||||
|
`asyncio.create_task(producer())` schedules the producer to run concurrently with the consumers rather than sequentially before them. The `await prod_task` after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 7 — Streaming manifest upload
|
||||||
|
|
||||||
|
```python
|
||||||
|
manifest_bytes = os.path.getsize(manifest_path)
|
||||||
|
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
||||||
|
|
||||||
|
with open(manifest_path, "rb") as f:
|
||||||
|
await s3.put_object(
|
||||||
|
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
|
||||||
|
ContentType="application/x-ndjson",
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Improvement 6 — streaming upload.** The original read the entire manifest into memory with `await f.read()` before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full `read()` doubles peak RAM usage. Passing a sync file handle as `Body` lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size.
|
||||||
|
|
||||||
|
`os.path.getsize` is called before opening the file for upload so the size is available for the EMF block even after the file is deleted.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 8 — Cleanup, metrics, and response
|
||||||
|
|
||||||
|
```python
|
||||||
|
os.unlink(manifest_path)
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"count": count,
|
||||||
|
"errors": errors,
|
||||||
|
"manifest_key": manifest_key,
|
||||||
|
"manifest_url": manifest_url,
|
||||||
|
}
|
||||||
|
response_bytes = len(json.dumps(result))
|
||||||
|
duration_ms = (time.monotonic() - t0) * 1000
|
||||||
|
|
||||||
|
_log("complete", request_id=request_id, count=count, errors=errors,
|
||||||
|
pages=pages, duration_ms=round(duration_ms, 2))
|
||||||
|
_emit_emf({
|
||||||
|
"PDFsProcessed": count, "S3ListPages": pages,
|
||||||
|
"PresignCount": count, "ManifestBytes": manifest_bytes,
|
||||||
|
"ResponseBytes": response_bytes,
|
||||||
|
}, Function="sign_pdfs_optimized")
|
||||||
|
|
||||||
|
return result
|
||||||
|
```
|
||||||
|
|
||||||
|
`os.unlink` removes the temp file after upload. `/tmp` is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) `/tmp` cap.
|
||||||
|
|
||||||
|
`response_bytes = len(json.dumps(result))` captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so `response_bytes` is typically under 300 bytes — but measuring it makes the headroom concrete.
|
||||||
|
|
||||||
|
The final `_log("complete", ...)` and `_emit_emf(...)` emit together at the end so all counts are final. In the tester, the Structured logs panel shows the `complete` event and the EMF panel shows the five metric values.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Block 9 — Entrypoint
|
||||||
|
|
||||||
|
```python
|
||||||
|
def handler(event, context):
|
||||||
|
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
||||||
|
result = asyncio.run(_run(event, request_id))
|
||||||
|
return {"statusCode": 200, "body": json.dumps(result)}
|
||||||
|
```
|
||||||
|
|
||||||
|
`asyncio.run` creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; `asyncio.run` is the correct bridge between the sync entrypoint and the async internals. A module-level `loop = asyncio.get_event_loop()` would work on the first invocation but is deprecated and unreliable across warm invocations.
|
||||||
|
|
||||||
|
`getattr(context, "aws_request_id", ...)` falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Summary of improvements
|
||||||
|
|
||||||
|
| # | What changed | Why it matters |
|
||||||
|
|---|---|---|
|
||||||
|
| 1 | Config from event, env as fallback | One function serves many prefixes |
|
||||||
|
| 2 | Structured JSON logging (`_log`) | Queryable in Logs Insights; visible in tester |
|
||||||
|
| 3 | EMF metrics (`_emit_emf`) | Free CloudWatch metrics, no API call overhead |
|
||||||
|
| 4 | Producer `try/finally` with N sentinels | Consumers always exit; handler never hangs |
|
||||||
|
| 5 | Per-item `try/except` in consumer | One bad key doesn't crash the batch |
|
||||||
|
| 6 | Sync file handle as `Body` | Flat memory for any manifest size |
|
||||||
|
| 7 | `PageSize=1000` | Fewer S3 round-trips on large prefixes |
|
||||||
|
| 8 | N concurrent consumers via `asyncio.gather` | Presign throughput scales with `concurrency` |
|
||||||
1
functions/sign_pdfs/events/default.json
Normal file
1
functions/sign_pdfs/events/default.json
Normal file
@@ -0,0 +1 @@
|
|||||||
|
{}
|
||||||
@@ -23,11 +23,13 @@ async def _run():
|
|||||||
|
|
||||||
async def producer():
|
async def producer():
|
||||||
paginator = s3.get_paginator("list_objects_v2")
|
paginator = s3.get_paginator("list_objects_v2")
|
||||||
|
try:
|
||||||
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
|
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
|
||||||
for obj in page.get("Contents", []) or []:
|
for obj in page.get("Contents", []) or []:
|
||||||
key = obj["Key"]
|
key = obj["Key"]
|
||||||
if key.lower().endswith(".pdf"):
|
if key.lower().endswith(".pdf"):
|
||||||
await queue.put(key)
|
await queue.put(key)
|
||||||
|
finally:
|
||||||
await queue.put(_DONE)
|
await queue.put(_DONE)
|
||||||
|
|
||||||
async def consumer():
|
async def consumer():
|
||||||
6
functions/sign_pdfs/requirements.txt
Normal file
6
functions/sign_pdfs/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
# Deps for the sign_pdfs lambda. Bundled into its deployment zip when
|
||||||
|
# uploading to AWS; locally, the runner pod installs the union of all
|
||||||
|
# per-function requirements (see Dockerfile.lambda).
|
||||||
|
aioboto3>=15.0 # async S3 client used in handler.py
|
||||||
|
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
|
||||||
|
boto3>=1.40 # sync S3 client used by seed.py (data setup utility)
|
||||||
1
functions/sign_pdfs_optimized/events/default.json
Normal file
1
functions/sign_pdfs_optimized/events/default.json
Normal file
@@ -0,0 +1 @@
|
|||||||
|
{}
|
||||||
6
functions/sign_pdfs_optimized/events/tuned.json
Normal file
6
functions/sign_pdfs_optimized/events/tuned.json
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"bucket": "my-company-reports-bucket",
|
||||||
|
"prefix": "2026/04/",
|
||||||
|
"page_size": 1000,
|
||||||
|
"concurrency": 4
|
||||||
|
}
|
||||||
165
functions/sign_pdfs_optimized/handler.py
Normal file
165
functions/sign_pdfs_optimized/handler.py
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
"""sign_pdfs_optimized — production-refined fork of sign_pdfs.
|
||||||
|
|
||||||
|
Same contract as the original (def handler(event, context) → {"statusCode": 200, "body": ...})
|
||||||
|
but with 8 production refinements applied side-by-side for the demo:
|
||||||
|
|
||||||
|
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
|
||||||
|
env vars are fallback only. Same function can serve many prefixes.
|
||||||
|
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
|
||||||
|
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
|
||||||
|
ManifestBytes, ResponseBytes.
|
||||||
|
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
|
||||||
|
per consumer, not a single _DONE).
|
||||||
|
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
|
||||||
|
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
|
||||||
|
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
|
||||||
|
8. N concurrent consumers via asyncio.gather — presign throughput scales with
|
||||||
|
the concurrency knob in the event.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import aioboto3
|
||||||
|
import aiofiles
|
||||||
|
|
||||||
|
_DONE = object()
|
||||||
|
|
||||||
|
|
||||||
|
def _cfg(event: dict) -> dict:
|
||||||
|
return {
|
||||||
|
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
|
||||||
|
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
|
||||||
|
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
|
||||||
|
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
||||||
|
"page_size": int(event.get("page_size") or 1000),
|
||||||
|
"concurrency": int(event.get("concurrency") or 4),
|
||||||
|
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _log(event_type: str, **fields):
|
||||||
|
print(json.dumps({"event": event_type, **fields}))
|
||||||
|
|
||||||
|
|
||||||
|
def _emit_emf(metrics: dict, **dims):
|
||||||
|
print(json.dumps({
|
||||||
|
"_aws": {
|
||||||
|
"Timestamp": int(time.time() * 1000),
|
||||||
|
"CloudWatchMetrics": [{
|
||||||
|
"Namespace": "eth/sign_pdfs_optimized",
|
||||||
|
"Dimensions": [list(dims.keys())],
|
||||||
|
"Metrics": [
|
||||||
|
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
|
||||||
|
for k in metrics
|
||||||
|
],
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
**dims, **metrics,
|
||||||
|
}))
|
||||||
|
|
||||||
|
|
||||||
|
async def _run(event: dict, request_id: str):
|
||||||
|
cfg = _cfg(event)
|
||||||
|
_log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"],
|
||||||
|
page_size=cfg["page_size"], concurrency=cfg["concurrency"])
|
||||||
|
t0 = time.monotonic()
|
||||||
|
|
||||||
|
pages = 0
|
||||||
|
errors = 0
|
||||||
|
session = aioboto3.Session()
|
||||||
|
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
|
||||||
|
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
|
||||||
|
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
|
||||||
|
|
||||||
|
async def producer():
|
||||||
|
nonlocal pages
|
||||||
|
try:
|
||||||
|
paginator = s3.get_paginator("list_objects_v2")
|
||||||
|
async for page in paginator.paginate(
|
||||||
|
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
|
||||||
|
PaginationConfig={"PageSize": cfg["page_size"]},
|
||||||
|
):
|
||||||
|
pages += 1
|
||||||
|
for obj in page.get("Contents", []) or []:
|
||||||
|
key = obj["Key"]
|
||||||
|
if key.lower().endswith(".pdf"):
|
||||||
|
await queue.put(key)
|
||||||
|
finally:
|
||||||
|
# one sentinel per consumer so each gather() task exits cleanly
|
||||||
|
for _ in range(cfg["concurrency"]):
|
||||||
|
await queue.put(_DONE)
|
||||||
|
|
||||||
|
write_lock = asyncio.Lock()
|
||||||
|
async with aiofiles.open(manifest_path, "w") as f:
|
||||||
|
|
||||||
|
async def consumer():
|
||||||
|
nonlocal errors
|
||||||
|
local = 0
|
||||||
|
while True:
|
||||||
|
item = await queue.get()
|
||||||
|
if item is _DONE:
|
||||||
|
return local
|
||||||
|
try:
|
||||||
|
url = await s3.generate_presigned_url(
|
||||||
|
"get_object",
|
||||||
|
Params={"Bucket": cfg["bucket"], "Key": item},
|
||||||
|
ExpiresIn=cfg["expiry"],
|
||||||
|
)
|
||||||
|
line = json.dumps({"key": item, "url": url}) + "\n"
|
||||||
|
async with write_lock:
|
||||||
|
await f.write(line)
|
||||||
|
local += 1
|
||||||
|
except Exception as exc:
|
||||||
|
errors += 1
|
||||||
|
_log("presign_error", request_id=request_id, key=item, error=str(exc))
|
||||||
|
|
||||||
|
prod_task = asyncio.create_task(producer())
|
||||||
|
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
|
||||||
|
await prod_task
|
||||||
|
|
||||||
|
count = sum(counts)
|
||||||
|
manifest_bytes = os.path.getsize(manifest_path)
|
||||||
|
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
||||||
|
|
||||||
|
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
|
||||||
|
# the whole manifest in memory like `body = await f.read()` would.
|
||||||
|
with open(manifest_path, "rb") as f:
|
||||||
|
await s3.put_object(
|
||||||
|
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
|
||||||
|
ContentType="application/x-ndjson",
|
||||||
|
)
|
||||||
|
|
||||||
|
manifest_url = await s3.generate_presigned_url(
|
||||||
|
"get_object",
|
||||||
|
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
|
||||||
|
ExpiresIn=cfg["expiry"],
|
||||||
|
)
|
||||||
|
os.unlink(manifest_path)
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"count": count,
|
||||||
|
"errors": errors,
|
||||||
|
"manifest_key": manifest_key,
|
||||||
|
"manifest_url": manifest_url,
|
||||||
|
}
|
||||||
|
response_bytes = len(json.dumps(result))
|
||||||
|
duration_ms = (time.monotonic() - t0) * 1000
|
||||||
|
|
||||||
|
_log("complete", request_id=request_id, count=count, errors=errors,
|
||||||
|
pages=pages, duration_ms=round(duration_ms, 2))
|
||||||
|
_emit_emf({
|
||||||
|
"PDFsProcessed": count, "S3ListPages": pages,
|
||||||
|
"PresignCount": count, "ManifestBytes": manifest_bytes,
|
||||||
|
"ResponseBytes": response_bytes,
|
||||||
|
}, Function="sign_pdfs_optimized")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def handler(event, context):
|
||||||
|
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
||||||
|
result = asyncio.run(_run(event, request_id))
|
||||||
|
return {"statusCode": 200, "body": json.dumps(result)}
|
||||||
3
functions/sign_pdfs_optimized/requirements.txt
Normal file
3
functions/sign_pdfs_optimized/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
# Same deps as sign_pdfs/ — this is the "after" demo fork, same dep footprint.
|
||||||
|
aioboto3>=15.0 # async S3 client
|
||||||
|
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
|
||||||
56
invoke.py
56
invoke.py
@@ -1,15 +1,65 @@
|
|||||||
|
"""Shared CLI invoker for any function in functions/.
|
||||||
|
|
||||||
|
Usage (inside the lambda pod, via `bash ctrl/invoke.sh [name [event_json]]`):
|
||||||
|
python invoke.py # invokes the only function, or the first found
|
||||||
|
python invoke.py lambda_function # specific function
|
||||||
|
python invoke.py lambda_function '{}' # with event payload
|
||||||
|
"""
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Defaults match the in-cluster configmap so behavior is identical whether
|
||||||
|
# this script is invoked directly or via the FastAPI runner. `setdefault`
|
||||||
|
# never overrides an existing env var — the configmap wins in the pod.
|
||||||
os.environ.setdefault("BUCKET_NAME", "my-company-reports-bucket")
|
os.environ.setdefault("BUCKET_NAME", "my-company-reports-bucket")
|
||||||
os.environ.setdefault("PREFIX", "2026/04/")
|
os.environ.setdefault("PREFIX", "2026/04/")
|
||||||
os.environ.setdefault("S3_ENDPOINT_URL", "http://localhost:9000")
|
os.environ.setdefault("S3_ENDPOINT_URL", "http://minio:9000")
|
||||||
os.environ.setdefault("AWS_ACCESS_KEY_ID", "minioadmin")
|
os.environ.setdefault("AWS_ACCESS_KEY_ID", "minioadmin")
|
||||||
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "minioadmin")
|
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "minioadmin")
|
||||||
os.environ.setdefault("AWS_REGION", "us-east-1")
|
os.environ.setdefault("AWS_REGION", "us-east-1")
|
||||||
|
|
||||||
from lambda_function import handler # noqa: E402
|
REPO_ROOT = Path(__file__).parent
|
||||||
|
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", str(REPO_ROOT / "functions")))
|
||||||
|
SHARED_DIR = REPO_ROOT / "shared"
|
||||||
|
|
||||||
|
# Make shared/ importable from any handler ("from shared import ..."). Matches
|
||||||
|
# how a Lambda Layer would expose code on PYTHONPATH at runtime.
|
||||||
|
if SHARED_DIR.exists() and str(REPO_ROOT) not in sys.path:
|
||||||
|
sys.path.insert(0, str(REPO_ROOT))
|
||||||
|
|
||||||
|
|
||||||
|
def _pick_default_function() -> str:
|
||||||
|
candidates = sorted(
|
||||||
|
d.name for d in FUNCTIONS_DIR.iterdir()
|
||||||
|
if d.is_dir() and not d.name.startswith("_") and (d / "handler.py").exists()
|
||||||
|
)
|
||||||
|
if not candidates:
|
||||||
|
print(f"no function folders with handler.py in {FUNCTIONS_DIR}", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
return candidates[0]
|
||||||
|
|
||||||
|
|
||||||
|
def _load(name: str):
|
||||||
|
path = FUNCTIONS_DIR / name / "handler.py"
|
||||||
|
if not path.exists():
|
||||||
|
print(f"function not found: {path}", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
spec = importlib.util.spec_from_file_location(f"functions.{name}.handler", path)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
if not hasattr(module, "handler"):
|
||||||
|
print(f"{path} has no handler(event, context)", file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
response = handler({}, None)
|
name = sys.argv[1] if len(sys.argv) > 1 else _pick_default_function()
|
||||||
|
event = json.loads(sys.argv[2]) if len(sys.argv) > 2 else {}
|
||||||
|
module = _load(name)
|
||||||
|
response = module.handler(event, None)
|
||||||
print(json.dumps(response, indent=2))
|
print(json.dumps(response, indent=2))
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
aioboto3>=15.0
|
# Root requirements = runner + invoker deps only. Per-function deps live
|
||||||
aiofiles>=23.2
|
# alongside each function (functions/<name>/requirements.txt) so they're
|
||||||
boto3>=1.40
|
# bundled with the function's deployment zip for AWS.
|
||||||
|
fastapi>=0.115
|
||||||
|
uvicorn[standard]>=0.32
|
||||||
|
|||||||
432
runner.py
Normal file
432
runner.py
Normal file
@@ -0,0 +1,432 @@
|
|||||||
|
"""Local Lambda runner — FastAPI wrapper that invokes any `handler(event, context)`
|
||||||
|
file in /app and reports AWS-equivalent metrics. Nothing in this file is touched
|
||||||
|
by the lambda function itself; functions stay verbatim-uploadable to AWS.
|
||||||
|
|
||||||
|
Features that are scaffolded now and "light up" later when matching improvements
|
||||||
|
land in the function:
|
||||||
|
- event payload pass-through (improvement #1: BUCKET/PREFIX from event)
|
||||||
|
- structured JSON log capture (improvement #2: JSON logging to stdout)
|
||||||
|
- EMF metric extraction (improvement #3: CloudWatch EMF embedded metrics)
|
||||||
|
Until the function emits those, the corresponding output fields are empty.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import importlib.util
|
||||||
|
import io
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import resource
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import sysconfig
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import uuid
|
||||||
|
import zipfile
|
||||||
|
from contextlib import redirect_stderr, redirect_stdout
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from fastapi import FastAPI, HTTPException
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", "/app/functions"))
|
||||||
|
SHARED_DIR = Path(os.environ.get("SHARED_DIR", "/app/shared"))
|
||||||
|
MAX_INVOCATIONS = int(os.environ.get("RUNNER_MAX_INVOCATIONS", "200"))
|
||||||
|
|
||||||
|
# Make shared/ importable for any function. Mirrors AWS Lambda Layer behavior
|
||||||
|
# (layer code is added to PYTHONPATH for all functions that attach it).
|
||||||
|
if SHARED_DIR.exists():
|
||||||
|
_repo_root = str(SHARED_DIR.parent)
|
||||||
|
if _repo_root not in sys.path:
|
||||||
|
sys.path.insert(0, _repo_root)
|
||||||
|
|
||||||
|
app = FastAPI(title="Lambda Local Runner")
|
||||||
|
|
||||||
|
_modules: dict = {} # name -> imported module (cache; presence = warm)
|
||||||
|
_module_deps: dict = {} # name -> set of sys.modules keys added during this function's init
|
||||||
|
_invocations: list[dict] = [] # newest last; capped at MAX_INVOCATIONS
|
||||||
|
|
||||||
|
|
||||||
|
class LambdaContext:
|
||||||
|
"""Minimal stand-in for the AWS Lambda context object. The function file
|
||||||
|
doesn't use any of this today, but the shape is right for when improvements
|
||||||
|
add `context.aws_request_id` to structured logs etc."""
|
||||||
|
|
||||||
|
def __init__(self, request_id: str, function_name: str, memory_mb: int, timeout_ms: int):
|
||||||
|
self.aws_request_id = request_id
|
||||||
|
self.function_name = function_name
|
||||||
|
self.function_version = "$LATEST"
|
||||||
|
self.invoked_function_arn = f"arn:aws:lambda:local:000000000000:function:{function_name}"
|
||||||
|
self.memory_limit_in_mb = memory_mb
|
||||||
|
self.log_group_name = f"/aws/lambda/{function_name}"
|
||||||
|
self.log_stream_name = f"local/{time.strftime('%Y/%m/%d')}/[$LATEST]{request_id}"
|
||||||
|
self._deadline_ms = time.monotonic() * 1000 + timeout_ms
|
||||||
|
|
||||||
|
def get_remaining_time_in_millis(self) -> int:
|
||||||
|
return max(0, int(self._deadline_ms - time.monotonic() * 1000))
|
||||||
|
|
||||||
|
|
||||||
|
class InvokeRequest(BaseModel):
|
||||||
|
event: dict = Field(default_factory=dict)
|
||||||
|
# AWS Lambda memory sizes: 128, 256, 512, 1024, 1536, 2048, 3008, 5120, 10240
|
||||||
|
memory_mb: int = 128
|
||||||
|
# AWS Lambda timeout: 1-900 seconds. Locally we record it but don't kill
|
||||||
|
# the handler (matches "function works verbatim" — no signal interruption).
|
||||||
|
timeout_ms: int = 30_000
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptRequest(BaseModel):
|
||||||
|
args: list[str] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/functions")
|
||||||
|
def list_functions():
|
||||||
|
"""Scan FUNCTIONS_DIR for subfolders containing a handler.py with
|
||||||
|
`def handler(event, context):`. Each function lives in its own folder
|
||||||
|
(matches AWS Lambda's deployment-package shape)."""
|
||||||
|
funcs: list[dict] = []
|
||||||
|
if not FUNCTIONS_DIR.exists():
|
||||||
|
return {"functions": [], "functions_dir": str(FUNCTIONS_DIR), "error": "directory not found"}
|
||||||
|
for d in sorted(FUNCTIONS_DIR.iterdir()):
|
||||||
|
if not d.is_dir() or d.name.startswith("_"):
|
||||||
|
continue
|
||||||
|
handler = d / "handler.py"
|
||||||
|
if not handler.exists():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
text = handler.read_text()
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if "def handler(event, context)" not in text and "def handler(event,context)" not in text:
|
||||||
|
continue
|
||||||
|
# Discover sample events (events/*.json) so the UI can populate a dropdown.
|
||||||
|
events = sorted(p.name for p in (d / "events").glob("*.json")) if (d / "events").is_dir() else []
|
||||||
|
funcs.append({"name": d.name, "events": events})
|
||||||
|
return {"functions": funcs, "functions_dir": str(FUNCTIONS_DIR)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/functions/{name}/events/{filename}")
|
||||||
|
def get_event(name: str, filename: str):
|
||||||
|
"""Serve a sample event file so the UI can preview/select it."""
|
||||||
|
path = FUNCTIONS_DIR / name / "events" / filename
|
||||||
|
if not path.exists() or not path.is_file():
|
||||||
|
raise HTTPException(status_code=404, detail="event file not found")
|
||||||
|
try:
|
||||||
|
return json.loads(path.read_text())
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=f"invalid JSON in {path}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/invoke/{name}")
|
||||||
|
def invoke(name: str, req: InvokeRequest):
|
||||||
|
"""Invoke a handler. Sync def so FastAPI runs us in a thread — that lets
|
||||||
|
the function call `asyncio.run(...)` internally without nested-loop errors
|
||||||
|
(which is how the current handler.py works)."""
|
||||||
|
target = FUNCTIONS_DIR / name / "handler.py"
|
||||||
|
if not target.exists():
|
||||||
|
raise HTTPException(status_code=404, detail=f"{target} not found")
|
||||||
|
|
||||||
|
invocation_id = str(uuid.uuid4())
|
||||||
|
cold_start = name not in _modules
|
||||||
|
init_duration_ms = None
|
||||||
|
record: dict = {
|
||||||
|
"invocation_id": invocation_id,
|
||||||
|
"function": name,
|
||||||
|
"timestamp": time.time(),
|
||||||
|
"event": req.event,
|
||||||
|
"result": None,
|
||||||
|
"error": None,
|
||||||
|
"stdout": "",
|
||||||
|
"stderr": "",
|
||||||
|
"structured_logs": [],
|
||||||
|
"emf_metrics": [],
|
||||||
|
"metrics": {
|
||||||
|
"cold_start": cold_start,
|
||||||
|
"init_duration_ms": None,
|
||||||
|
"duration_ms": 0.0,
|
||||||
|
"billed_duration_ms": 0,
|
||||||
|
"memory_size_mb": req.memory_mb,
|
||||||
|
"max_memory_used_mb": 0.0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Cold-start: import the module and time the import. This matches AWS's
|
||||||
|
# "Init Duration" — time to load module-level code (imports + module-scope
|
||||||
|
# statements). On warm invocations this whole block is skipped.
|
||||||
|
if cold_start:
|
||||||
|
spec = importlib.util.spec_from_file_location(
|
||||||
|
f"functions.{name}.handler", target,
|
||||||
|
)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
# Snapshot sys.modules so /reset can pop the transitive deps this function
|
||||||
|
# pulled in (aioboto3 → aiobotocore → botocore, etc.). Without this the
|
||||||
|
# second cold start is fake — heavy imports stay cached in the long-running
|
||||||
|
# uvicorn process, and Force Cold reports unrealistically small numbers.
|
||||||
|
sys_modules_before = set(sys.modules)
|
||||||
|
t0 = time.monotonic()
|
||||||
|
try:
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
except Exception as e:
|
||||||
|
init_duration_ms = (time.monotonic() - t0) * 1000
|
||||||
|
record["error"] = _format_exception(e)
|
||||||
|
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
|
||||||
|
return _record(record)
|
||||||
|
init_duration_ms = (time.monotonic() - t0) * 1000
|
||||||
|
_modules[name] = module
|
||||||
|
_module_deps[name] = set(sys.modules) - sys_modules_before
|
||||||
|
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
|
||||||
|
module = _modules[name]
|
||||||
|
|
||||||
|
if not hasattr(module, "handler"):
|
||||||
|
raise HTTPException(status_code=400, detail=f"{name}.py has no handler() function")
|
||||||
|
|
||||||
|
context = LambdaContext(
|
||||||
|
request_id=invocation_id,
|
||||||
|
function_name=name,
|
||||||
|
memory_mb=req.memory_mb,
|
||||||
|
timeout_ms=req.timeout_ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
stdout_buf = io.StringIO()
|
||||||
|
stderr_buf = io.StringIO()
|
||||||
|
t_handler = time.monotonic()
|
||||||
|
try:
|
||||||
|
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
|
||||||
|
result = module.handler(req.event, context)
|
||||||
|
# Defensive: if a future async handler ever returns a coroutine
|
||||||
|
# (AWS doesn't support that natively, but we might), run it.
|
||||||
|
if asyncio.iscoroutine(result):
|
||||||
|
result = asyncio.run(result)
|
||||||
|
record["result"] = result
|
||||||
|
except Exception as e:
|
||||||
|
record["error"] = _format_exception(e)
|
||||||
|
duration_ms = (time.monotonic() - t_handler) * 1000
|
||||||
|
|
||||||
|
# ru_maxrss: kilobytes on Linux, bytes on macOS. We run on Linux in kind.
|
||||||
|
rusage = resource.getrusage(resource.RUSAGE_SELF)
|
||||||
|
max_memory_mb = rusage.ru_maxrss / 1024
|
||||||
|
|
||||||
|
record["stdout"] = stdout_buf.getvalue()
|
||||||
|
record["stderr"] = stderr_buf.getvalue()
|
||||||
|
record["structured_logs"] = _extract_json_logs(record["stdout"])
|
||||||
|
record["emf_metrics"] = _extract_emf_metrics(record["stdout"])
|
||||||
|
record["metrics"]["duration_ms"] = round(duration_ms, 2)
|
||||||
|
record["metrics"]["billed_duration_ms"] = int(math.ceil(duration_ms))
|
||||||
|
record["metrics"]["max_memory_used_mb"] = round(max_memory_mb, 2)
|
||||||
|
return _record(record)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/invocations")
|
||||||
|
def list_invocations(limit: int = 50):
|
||||||
|
"""Index of past invocations, newest first. Lightweight summary only —
|
||||||
|
use /invocations/{id} for the full record."""
|
||||||
|
items = []
|
||||||
|
for r in reversed(_invocations[-limit:]):
|
||||||
|
items.append({
|
||||||
|
"invocation_id": r["invocation_id"],
|
||||||
|
"function": r["function"],
|
||||||
|
"timestamp": r["timestamp"],
|
||||||
|
"cold_start": r["metrics"]["cold_start"],
|
||||||
|
"duration_ms": r["metrics"]["duration_ms"],
|
||||||
|
"init_duration_ms": r["metrics"]["init_duration_ms"],
|
||||||
|
"max_memory_used_mb": r["metrics"]["max_memory_used_mb"],
|
||||||
|
"ok": r["error"] is None,
|
||||||
|
})
|
||||||
|
return {"invocations": items, "total": len(_invocations)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/invocations/{invocation_id}")
|
||||||
|
def get_invocation(invocation_id: str):
|
||||||
|
for r in _invocations:
|
||||||
|
if r["invocation_id"] == invocation_id:
|
||||||
|
return r
|
||||||
|
raise HTTPException(status_code=404, detail="invocation not found")
|
||||||
|
|
||||||
|
|
||||||
|
@app.delete("/invocations")
|
||||||
|
def clear_invocations():
|
||||||
|
n = len(_invocations)
|
||||||
|
_invocations.clear()
|
||||||
|
return {"cleared": n}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/reset")
|
||||||
|
def reset_modules():
|
||||||
|
"""Clear the module cache AND the transitive imports each function pulled in
|
||||||
|
during its init, so the next invocation pays a realistic cold-start cost
|
||||||
|
(re-importing aioboto3 → aiobotocore → botocore from disk, not a no-op
|
||||||
|
against an already-warm uvicorn process)."""
|
||||||
|
cleared = list(_modules.keys())
|
||||||
|
popped = 0
|
||||||
|
for name in cleared:
|
||||||
|
sys.modules.pop(f"functions.{name}.handler", None)
|
||||||
|
for dep in _module_deps.pop(name, ()):
|
||||||
|
if sys.modules.pop(dep, None) is not None:
|
||||||
|
popped += 1
|
||||||
|
_modules.clear()
|
||||||
|
return {"cleared": cleared, "transitive_modules_popped": popped}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/functions/{name}/scripts")
|
||||||
|
def list_scripts(name: str):
|
||||||
|
"""List support scripts for a function — any .py file that isn't handler.py."""
|
||||||
|
func_dir = FUNCTIONS_DIR / name
|
||||||
|
if not func_dir.is_dir():
|
||||||
|
raise HTTPException(status_code=404, detail=f"function {name!r} not found")
|
||||||
|
scripts = [
|
||||||
|
p.name for p in sorted(func_dir.glob("*.py"))
|
||||||
|
if p.name not in ("handler.py", "__init__.py")
|
||||||
|
]
|
||||||
|
return {"scripts": scripts, "function": name}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/scripts/{fn_name}/{script_name}")
|
||||||
|
def run_script(fn_name: str, script_name: str, req: ScriptRequest):
|
||||||
|
"""Run a support script from functions/<fn_name>/<script_name> with optional args."""
|
||||||
|
if ".." in script_name or "/" in script_name:
|
||||||
|
raise HTTPException(status_code=400, detail="invalid script name")
|
||||||
|
if not script_name.endswith(".py"):
|
||||||
|
raise HTTPException(status_code=400, detail="only .py scripts allowed")
|
||||||
|
script_path = FUNCTIONS_DIR / fn_name / script_name
|
||||||
|
if not script_path.exists():
|
||||||
|
raise HTTPException(status_code=404, detail=f"{script_path} not found")
|
||||||
|
cmd = [sys.executable, str(script_path)] + list(req.args)
|
||||||
|
t0 = time.monotonic()
|
||||||
|
try:
|
||||||
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
return {"returncode": -1, "stdout": "", "stderr": "timed out after 300 s",
|
||||||
|
"duration_ms": 300_000.0}
|
||||||
|
return {
|
||||||
|
"returncode": result.returncode,
|
||||||
|
"stdout": result.stdout,
|
||||||
|
"stderr": result.stderr,
|
||||||
|
"duration_ms": round((time.monotonic() - t0) * 1000, 2),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/packaging")
|
||||||
|
def packaging():
|
||||||
|
"""Static sizing report — what each function would ship as a Lambda deployment zip,
|
||||||
|
what the shared layer would weigh, what the largest installed deps look like.
|
||||||
|
All computed against the live pod filesystem so numbers are real, not extrapolated."""
|
||||||
|
funcs = []
|
||||||
|
for d in sorted(FUNCTIONS_DIR.iterdir()):
|
||||||
|
if not d.is_dir() or d.name.startswith("_"):
|
||||||
|
continue
|
||||||
|
if not (d / "handler.py").exists():
|
||||||
|
continue
|
||||||
|
funcs.append({
|
||||||
|
"name": d.name,
|
||||||
|
"handler_bytes": (d / "handler.py").stat().st_size,
|
||||||
|
"folder_bytes": _dir_bytes(d),
|
||||||
|
"folder_zip_bytes": _zip_bytes(d),
|
||||||
|
})
|
||||||
|
|
||||||
|
site_packages = _site_packages_dir()
|
||||||
|
deps: list[dict] = []
|
||||||
|
if site_packages:
|
||||||
|
for child in sorted(site_packages.iterdir()):
|
||||||
|
if not child.is_dir():
|
||||||
|
continue
|
||||||
|
if child.name.startswith("_") or child.name.endswith(".dist-info"):
|
||||||
|
continue
|
||||||
|
b = _dir_bytes(child)
|
||||||
|
if b > 50_000:
|
||||||
|
deps.append({"name": child.name, "bytes": b})
|
||||||
|
deps.sort(key=lambda x: -x["bytes"])
|
||||||
|
|
||||||
|
shared_bytes = _dir_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
|
||||||
|
shared_zip = _zip_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
|
||||||
|
|
||||||
|
return {
|
||||||
|
"functions": funcs,
|
||||||
|
"dependencies": deps[:25],
|
||||||
|
"dependencies_total_bytes": sum(d["bytes"] for d in deps),
|
||||||
|
"shared_layer": {"bytes": shared_bytes, "zip_bytes": shared_zip},
|
||||||
|
"limits": {
|
||||||
|
"zip_upload_max": 50 * 1024 * 1024,
|
||||||
|
"unzipped_max": 250 * 1024 * 1024,
|
||||||
|
"container_image_max": 10 * 1024 * 1024 * 1024,
|
||||||
|
"tmp_default": 512 * 1024 * 1024,
|
||||||
|
"tmp_max": 10 * 1024 * 1024 * 1024,
|
||||||
|
"response_max": 6 * 1024 * 1024,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _dir_bytes(path: Path) -> int:
|
||||||
|
total = 0
|
||||||
|
for p in path.rglob("*"):
|
||||||
|
if p.is_file():
|
||||||
|
try:
|
||||||
|
total += p.stat().st_size
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return total
|
||||||
|
|
||||||
|
|
||||||
|
def _zip_bytes(path: Path) -> int:
|
||||||
|
"""Compute deflate-zipped size without writing to disk."""
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
|
for p in path.rglob("*"):
|
||||||
|
if p.is_file():
|
||||||
|
try:
|
||||||
|
zf.write(p, p.relative_to(path))
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return buf.getbuffer().nbytes
|
||||||
|
|
||||||
|
|
||||||
|
def _site_packages_dir() -> Path | None:
|
||||||
|
purelib = sysconfig.get_paths().get("purelib")
|
||||||
|
return Path(purelib) if purelib and Path(purelib).exists() else None
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health():
|
||||||
|
return {"ok": True, "loaded_modules": list(_modules.keys()), "invocations": len(_invocations)}
|
||||||
|
|
||||||
|
|
||||||
|
def _format_exception(e: BaseException) -> dict:
|
||||||
|
return {
|
||||||
|
"type": type(e).__name__,
|
||||||
|
"message": str(e),
|
||||||
|
"traceback": traceback.format_exc(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_json_logs(stdout_text: str) -> list[dict]:
|
||||||
|
"""Parse JSON-per-line structured logs out of stdout. Fails silently —
|
||||||
|
until the function emits structured logs (improvement #2), this returns []."""
|
||||||
|
logs: list[dict] = []
|
||||||
|
for line in stdout_text.splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if not (line.startswith("{") and line.endswith("}")):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
logs.append(json.loads(line))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
return logs
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_emf_metrics(stdout_text: str) -> list[dict]:
|
||||||
|
"""Parse CloudWatch EMF metric records out of stdout. EMF format:
|
||||||
|
{"_aws": {"CloudWatchMetrics": [...], "Timestamp": ...}, "<metric>": value, ...}
|
||||||
|
Fails silently — until the function emits EMF (improvement #3), returns []."""
|
||||||
|
metrics: list[dict] = []
|
||||||
|
for entry in _extract_json_logs(stdout_text):
|
||||||
|
aws = entry.get("_aws")
|
||||||
|
if isinstance(aws, dict) and "CloudWatchMetrics" in aws:
|
||||||
|
metrics.append(entry)
|
||||||
|
return metrics
|
||||||
|
|
||||||
|
|
||||||
|
def _record(rec: dict) -> dict:
|
||||||
|
_invocations.append(rec)
|
||||||
|
if len(_invocations) > MAX_INVOCATIONS:
|
||||||
|
del _invocations[: len(_invocations) - MAX_INVOCATIONS]
|
||||||
|
return rec
|
||||||
21
shared/README.md
Normal file
21
shared/README.md
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
# shared/
|
||||||
|
|
||||||
|
Cross-function Python modules — the local equivalent of an AWS Lambda Layer.
|
||||||
|
|
||||||
|
Anything in this directory is available to every function under
|
||||||
|
`functions/<name>/handler.py` as a regular import:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# functions/sign_pdfs/handler.py
|
||||||
|
from shared import common_utils
|
||||||
|
```
|
||||||
|
|
||||||
|
The `shared/` directory is added to `sys.path` by the runner. For an AWS
|
||||||
|
deploy, you'd either:
|
||||||
|
|
||||||
|
1. Package it as a real Lambda Layer (preferred), and reference the layer ARN
|
||||||
|
from each function's deploy spec, **or**
|
||||||
|
2. Vendor a copy into each function's deployment zip (simpler, less DRY).
|
||||||
|
|
||||||
|
Currently empty — no cross-function code yet. First candidate would probably
|
||||||
|
be a structured-logging helper once multiple functions need it.
|
||||||
Reference in New Issue
Block a user