Compare commits
2 Commits
7c5aa14409
...
29b095d583
| Author | SHA1 | Date | |
|---|---|---|---|
| 29b095d583 | |||
| 6652cb26e6 |
@@ -2,9 +2,27 @@ FROM python:3.13-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Function-specific deps first. Each function carries its own requirements.txt
|
||||
# (so a real AWS deploy zips the function folder verbatim). Locally, the pod
|
||||
# installs the union of all of them.
|
||||
COPY functions/ ./functions/
|
||||
RUN set -e; \
|
||||
for r in functions/*/requirements.txt; do \
|
||||
[ -f "$r" ] && pip install --no-cache-dir -r "$r"; \
|
||||
done
|
||||
|
||||
# Runner deps (FastAPI + uvicorn). Lives only in the runner pod; NOT bundled
|
||||
# with any function zip for AWS.
|
||||
COPY requirements.txt ./
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY lambda_function.py invoke.py seed.py ./
|
||||
# Shared modules (Lambda-Layer equivalent) and generic tooling.
|
||||
COPY shared/ ./shared/
|
||||
COPY invoke.py runner.py ./
|
||||
|
||||
CMD ["sleep", "infinity"]
|
||||
# uvicorn --reload restarts on .py change → every code edit produces a fresh
|
||||
# cold start, matching how AWS Lambda's container lifecycle works when you
|
||||
# redeploy. Watching the whole /app tree picks up function-folder edits too.
|
||||
CMD ["uvicorn", "runner:app", \
|
||||
"--host", "0.0.0.0", "--port", "8000", \
|
||||
"--reload", "--reload-dir", "/app"]
|
||||
|
||||
@@ -23,9 +23,16 @@ docker_build(
|
||||
dockerfile='Dockerfile.lambda',
|
||||
ignore=['.git', 'def', '.venv', 'docs', '__pycache__', '.pytest_cache'],
|
||||
live_update=[
|
||||
sync('../lambda_function.py', '/app/lambda_function.py'),
|
||||
# Whole functions/ directory — new files appear in the tester's
|
||||
# function list automatically; edits to existing files cause uvicorn
|
||||
# to drop them from the warm-cache so the next invoke is cold (the
|
||||
# `reset_modules` endpoint also lets you force it manually).
|
||||
sync('../functions', '/app/functions'),
|
||||
sync('../invoke.py', '/app/invoke.py'),
|
||||
sync('../seed.py', '/app/seed.py'),
|
||||
# runner.py change → uvicorn --reload restarts the process → all
|
||||
# function modules drop out of the cache, next invocation cold.
|
||||
sync('../runner.py', '/app/runner.py'),
|
||||
],
|
||||
)
|
||||
|
||||
@@ -45,7 +52,18 @@ k8s_resource('lambda', resource_deps=['minio'])
|
||||
k8s_resource('docs')
|
||||
k8s_resource('gateway', resource_deps=['docs', 'minio'])
|
||||
|
||||
# Hot-reload gateway Caddy on Caddyfile edit. configMapGenerator uses
|
||||
# disableNameSuffixHash so the Deployment template doesn't change → kustomize
|
||||
# won't roll the pod on its own. This local_resource closes the loop.
|
||||
local_resource(
|
||||
'gateway-reload',
|
||||
cmd='kubectl --context kind-eth -n eth rollout restart deployment/gateway',
|
||||
deps=['k8s/base/Caddyfile'],
|
||||
resource_deps=['gateway'],
|
||||
auto_init=False,
|
||||
)
|
||||
|
||||
k8s_resource(
|
||||
objects=['eth:namespace', 'eth-config:configmap'],
|
||||
objects=['eth:namespace', 'eth-config:configmap', 'gateway-config:configmap'],
|
||||
new_name='infra',
|
||||
)
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
# Invoke a function from functions/ directly via `python invoke.py`.
|
||||
# Usage:
|
||||
# ctrl/invoke.sh # default: first function found
|
||||
# ctrl/invoke.sh lambda_function # specific function
|
||||
# ctrl/invoke.sh lambda_function '{"key":"val"}' # with event payload
|
||||
#
|
||||
# For the same invocation through the FastAPI tester (with cold/warm + memory
|
||||
# metrics) use the Lambda Tester tab at http://eth.local.ar or POST to
|
||||
# http://eth.local.ar/runner/invoke/<name>.
|
||||
set -euo pipefail
|
||||
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python invoke.py "$@"
|
||||
|
||||
@@ -4,6 +4,22 @@
|
||||
}
|
||||
|
||||
eth.local.ar:80 {
|
||||
# API surface for the local Lambda tester (FastAPI in the lambda pod).
|
||||
# Path-based is fine for our own API — no SPA assumes ownership of `/`,
|
||||
# and a single origin means no CORS headaches with the frontend at /.
|
||||
handle_path /runner/* {
|
||||
reverse_proxy lambda:8000
|
||||
}
|
||||
|
||||
# Everything else → static docs viewer (the frontend lives here too).
|
||||
handle {
|
||||
reverse_proxy docs:80
|
||||
}
|
||||
}
|
||||
|
||||
docs.eth.local.ar:80 {
|
||||
# Serve /docs.html for the root path; everything else (graphs, viewer.html) passes through.
|
||||
rewrite / /docs.html
|
||||
reverse_proxy docs:80
|
||||
}
|
||||
|
||||
|
||||
@@ -12,10 +12,14 @@ resources:
|
||||
- gateway.yaml
|
||||
|
||||
# Generate the gateway Caddyfile ConfigMap from the standalone file.
|
||||
# Hash suffix is on by default — when Caddyfile changes, the ConfigMap gets
|
||||
# a new hashed name, kustomize rewrites the Deployment volume reference,
|
||||
# and the gateway pod restarts automatically with the new config.
|
||||
# Hash suffix disabled so the name stays static — lets Tilt group it under
|
||||
# the 'infra' resource (no "uncategorized" pill). Trade-off: pod doesn't
|
||||
# auto-restart on Caddyfile change; the Tiltfile has a local_resource
|
||||
# 'gateway-reload' that does `kubectl rollout restart` whenever Caddyfile
|
||||
# is edited, so the experience is the same in practice.
|
||||
configMapGenerator:
|
||||
- name: gateway-config
|
||||
files:
|
||||
- Caddyfile
|
||||
options:
|
||||
disableNameSuffixHash: true
|
||||
|
||||
@@ -16,8 +16,14 @@ spec:
|
||||
containers:
|
||||
- name: lambda
|
||||
image: eth-lambda
|
||||
command: ["sleep", "infinity"]
|
||||
# The container runs the FastAPI runner (see runner.py + Dockerfile).
|
||||
# The CMD comes from the Dockerfile (uvicorn). Container also stays
|
||||
# exec-able for `bash ctrl/seed.sh` / `bash ctrl/invoke.sh` which
|
||||
# spawn separate python processes alongside uvicorn.
|
||||
workingDir: /app
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 8000
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: eth-config
|
||||
@@ -25,14 +31,33 @@ spec:
|
||||
- name: documents
|
||||
mountPath: /mnt/documents
|
||||
readOnly: true
|
||||
readinessProbe:
|
||||
httpGet:
|
||||
path: /health
|
||||
port: 8000
|
||||
initialDelaySeconds: 3
|
||||
periodSeconds: 5
|
||||
resources:
|
||||
requests:
|
||||
memory: 128Mi
|
||||
cpu: 100m
|
||||
limits:
|
||||
memory: 512Mi
|
||||
memory: 1Gi
|
||||
volumes:
|
||||
- name: documents
|
||||
hostPath:
|
||||
path: /mnt/documents
|
||||
type: Directory
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: lambda
|
||||
namespace: eth
|
||||
spec:
|
||||
selector:
|
||||
app: lambda
|
||||
ports:
|
||||
- name: http
|
||||
port: 8000
|
||||
targetPort: 8000
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
# Uploads the host's /mnt/documents tree into the in-cluster MinIO. Lives
|
||||
# inside the sign_pdfs function folder because it's specific to *this*
|
||||
# function's data shape (bucket "my-company-reports-bucket", prefix "2026/04/").
|
||||
# Other functions will have their own seed scripts in their own folders.
|
||||
set -euo pipefail
|
||||
kubectl --context kind-eth -n eth exec -i deploy/lambda -- python seed.py /mnt/documents
|
||||
kubectl --context kind-eth -n eth exec -i deploy/lambda -- \
|
||||
python functions/sign_pdfs/seed.py /mnt/documents
|
||||
|
||||
1616
docs/docs.html
Normal file
1616
docs/docs.html
Normal file
File diff suppressed because it is too large
Load Diff
1832
docs/index.html
1832
docs/index.html
File diff suppressed because it is too large
Load Diff
237
docs/lambdas-md/lambda-20-sign-pdfs-optimized.md
Normal file
237
docs/lambdas-md/lambda-20-sign-pdfs-optimized.md
Normal file
@@ -0,0 +1,237 @@
|
||||
# sign_pdfs_optimized — Walkthrough
|
||||
|
||||
> Production-refined fork of `sign_pdfs`. Applies 8 improvements at once; meant to be compared side-by-side with the original in the local tester.
|
||||
|
||||
The original `sign_pdfs` is the "before" snapshot — it is left untouched. This file (`sign_pdfs_optimized/handler.py`) is the "after": same contract, same output shape, all the rough edges from the original fixed.
|
||||
|
||||
---
|
||||
|
||||
## Block 1 — Imports and sentinel
|
||||
|
||||
```python
|
||||
import asyncio, json, os, time, uuid
|
||||
import aioboto3
|
||||
import aiofiles
|
||||
|
||||
_DONE = object()
|
||||
```
|
||||
|
||||
Nothing new at the import level. `_DONE` is a sentinel object used to signal consumers that the producer has finished. Using a unique object (rather than `None` or a string) means no PDF key will ever accidentally match it.
|
||||
|
||||
**Original issue fixed here:** the original put a single `_DONE` on the queue regardless of how many consumers were running. With N consumers, one of them would consume the sentinel and exit, but the others would block forever waiting for a key that never arrives. The fix is one sentinel per consumer — that happens in the producer's `finally` block (Block 4).
|
||||
|
||||
---
|
||||
|
||||
## Block 2 — Config from event (`_cfg`)
|
||||
|
||||
```python
|
||||
def _cfg(event: dict) -> dict:
|
||||
return {
|
||||
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
|
||||
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
|
||||
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
|
||||
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
||||
"page_size": int(event.get("page_size") or 1000),
|
||||
"concurrency": int(event.get("concurrency") or 4),
|
||||
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
||||
}
|
||||
```
|
||||
|
||||
**Improvement 1 — event-driven config.** The original hard-coded `bucket`, `prefix`, and `expiry` as module-level constants read once from env vars. That means one function deployment = one bucket/prefix combination. Here the event is the primary source; env vars are fallbacks. The same deployed function can serve `2026/04/`, `2026/05/`, or an entirely different bucket by changing only the event payload — no redeployment.
|
||||
|
||||
`endpoint` is env-only because it is infrastructure (the MinIO URL in local dev, absent in production) — callers should never need to specify it per-request.
|
||||
|
||||
`queue_max` bounds the in-memory queue so the producer can't buffer unbounded keys before consumers have a chance to drain it. At 1000 keys/page × 2000 queue slots, the producer is at most 2 pages ahead of the consumers.
|
||||
|
||||
---
|
||||
|
||||
## Block 3 — Structured logging (`_log`) and EMF metrics (`_emit_emf`)
|
||||
|
||||
```python
|
||||
def _log(event_type: str, **fields):
|
||||
print(json.dumps({"event": event_type, **fields}))
|
||||
|
||||
def _emit_emf(metrics: dict, **dims):
|
||||
print(json.dumps({
|
||||
"_aws": {
|
||||
"Timestamp": int(time.time() * 1000),
|
||||
"CloudWatchMetrics": [{
|
||||
"Namespace": "eth/sign_pdfs_optimized",
|
||||
"Dimensions": [list(dims.keys())],
|
||||
"Metrics": [
|
||||
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
|
||||
for k in metrics
|
||||
],
|
||||
}],
|
||||
},
|
||||
**dims, **metrics,
|
||||
}))
|
||||
```
|
||||
|
||||
**Improvement 2 — structured JSON logging.** The original printed nothing. `_log` emits one JSON object per line to stdout. CloudWatch Logs Insights can then query fields directly:
|
||||
|
||||
```
|
||||
filter event = "complete" | stats avg(duration_ms) by bin(5m)
|
||||
```
|
||||
|
||||
The local runner's `_extract_json_logs` picks up these lines and surfaces them in the tester's "Structured logs" panel.
|
||||
|
||||
**Improvement 3 — EMF metrics.** `_emit_emf` writes a CloudWatch Embedded Metrics Format block to stdout. The Lambda runtime (and locally, the runner's `_extract_emf_metrics`) parses this and publishes real CloudWatch metrics — no `PutMetricData` API call, no extra latency, no per-metric cost. The unit is inferred from the metric name: anything ending in `Bytes` → `"Bytes"`, everything else → `"Count"`.
|
||||
|
||||
EMF is emitted once at the end of the invocation, not once per item. Putting it inside a loop would flood CloudWatch with thousands of data points per invocation.
|
||||
|
||||
---
|
||||
|
||||
## Block 4 — Producer with `try/finally`
|
||||
|
||||
```python
|
||||
async def producer():
|
||||
nonlocal pages
|
||||
try:
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
async for page in paginator.paginate(
|
||||
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
|
||||
PaginationConfig={"PageSize": cfg["page_size"]},
|
||||
):
|
||||
pages += 1
|
||||
for obj in page.get("Contents", []) or []:
|
||||
key = obj["Key"]
|
||||
if key.lower().endswith(".pdf"):
|
||||
await queue.put(key)
|
||||
finally:
|
||||
for _ in range(cfg["concurrency"]):
|
||||
await queue.put(_DONE)
|
||||
```
|
||||
|
||||
**Improvement 4 — guaranteed consumer exit.** The `finally` block runs whether the producer succeeds, raises a `ClientError`, or is cancelled. Each of the N consumers gets its own `_DONE` sentinel. Without `finally`, an exception in the producer leaves all consumers blocked on `queue.get()` forever — the handler hangs and eventually times out at 15 minutes.
|
||||
|
||||
**Improvement 7 — `PageSize=1000`.** S3's `ListObjectsV2` returns at most 1000 keys per page by default, but specifying it explicitly avoids the paginator defaulting to a smaller value on some SDK versions. Fewer pages = fewer round-trip API calls for large prefixes.
|
||||
|
||||
---
|
||||
|
||||
## Block 5 — Consumer with per-item error handling
|
||||
|
||||
```python
|
||||
async def consumer():
|
||||
nonlocal errors
|
||||
local = 0
|
||||
while True:
|
||||
item = await queue.get()
|
||||
if item is _DONE:
|
||||
return local
|
||||
try:
|
||||
url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": cfg["bucket"], "Key": item},
|
||||
ExpiresIn=cfg["expiry"],
|
||||
)
|
||||
line = json.dumps({"key": item, "url": url}) + "\n"
|
||||
async with write_lock:
|
||||
await f.write(line)
|
||||
local += 1
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
_log("presign_error", request_id=request_id, key=item, error=str(exc))
|
||||
```
|
||||
|
||||
**Improvement 5 — per-item error isolation.** The original let a single `generate_presigned_url` failure propagate out of the consumer and crash the whole batch. Here, each item is wrapped in its own `try/except`. A bad key increments `errors` and logs the details; the consumer continues with the next key. The caller receives `{"count": N, "errors": M}` and can decide whether to retry the M failures.
|
||||
|
||||
`local` counts successful presigns per consumer. The N return values are summed after `asyncio.gather` to get the total count.
|
||||
|
||||
The `write_lock` serialises writes to the shared JSONL file — without it, concurrent `await f.write(line)` calls would interleave partial lines.
|
||||
|
||||
---
|
||||
|
||||
## Block 6 — N concurrent consumers (`asyncio.gather`)
|
||||
|
||||
```python
|
||||
prod_task = asyncio.create_task(producer())
|
||||
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
|
||||
await prod_task
|
||||
```
|
||||
|
||||
**Improvement 8 — concurrent consumers.** The original ran one consumer. Here `asyncio.gather` starts N consumer coroutines simultaneously, all reading from the same queue. Since `generate_presigned_url` is an async S3 API call, each consumer yields while waiting for the response, letting the others proceed. With N=4 and 1000 PDFs, peak in-flight presign requests is 4 instead of 1 — throughput roughly scales with `concurrency` up to the point where the producer can't keep the queue full.
|
||||
|
||||
`asyncio.create_task(producer())` schedules the producer to run concurrently with the consumers rather than sequentially before them. The `await prod_task` after gather ensures any producer exception is re-raised (not silently swallowed as it would be in a fire-and-forget task).
|
||||
|
||||
---
|
||||
|
||||
## Block 7 — Streaming manifest upload
|
||||
|
||||
```python
|
||||
manifest_bytes = os.path.getsize(manifest_path)
|
||||
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
||||
|
||||
with open(manifest_path, "rb") as f:
|
||||
await s3.put_object(
|
||||
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
|
||||
ContentType="application/x-ndjson",
|
||||
)
|
||||
```
|
||||
|
||||
**Improvement 6 — streaming upload.** The original read the entire manifest into memory with `await f.read()` before uploading. For a bucket with 100k PDFs, the JSONL manifest could be tens of MB — a full `read()` doubles peak RAM usage. Passing a sync file handle as `Body` lets aiobotocore read the file in chunks internally. Memory usage stays flat regardless of manifest size.
|
||||
|
||||
`os.path.getsize` is called before opening the file for upload so the size is available for the EMF block even after the file is deleted.
|
||||
|
||||
---
|
||||
|
||||
## Block 8 — Cleanup, metrics, and response
|
||||
|
||||
```python
|
||||
os.unlink(manifest_path)
|
||||
|
||||
result = {
|
||||
"count": count,
|
||||
"errors": errors,
|
||||
"manifest_key": manifest_key,
|
||||
"manifest_url": manifest_url,
|
||||
}
|
||||
response_bytes = len(json.dumps(result))
|
||||
duration_ms = (time.monotonic() - t0) * 1000
|
||||
|
||||
_log("complete", request_id=request_id, count=count, errors=errors,
|
||||
pages=pages, duration_ms=round(duration_ms, 2))
|
||||
_emit_emf({
|
||||
"PDFsProcessed": count, "S3ListPages": pages,
|
||||
"PresignCount": count, "ManifestBytes": manifest_bytes,
|
||||
"ResponseBytes": response_bytes,
|
||||
}, Function="sign_pdfs_optimized")
|
||||
|
||||
return result
|
||||
```
|
||||
|
||||
`os.unlink` removes the temp file after upload. `/tmp` is shared across warm invocations — not cleaning up leaks storage toward the 512 MB (default) or 10 GB (max) `/tmp` cap.
|
||||
|
||||
`response_bytes = len(json.dumps(result))` captures how large the sync response would be. Lambda's synchronous response limit is 6 MB. This function returns a URL, not a body, so `response_bytes` is typically under 300 bytes — but measuring it makes the headroom concrete.
|
||||
|
||||
The final `_log("complete", ...)` and `_emit_emf(...)` emit together at the end so all counts are final. In the tester, the Structured logs panel shows the `complete` event and the EMF panel shows the five metric values.
|
||||
|
||||
---
|
||||
|
||||
## Block 9 — Entrypoint
|
||||
|
||||
```python
|
||||
def handler(event, context):
|
||||
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
||||
result = asyncio.run(_run(event, request_id))
|
||||
return {"statusCode": 200, "body": json.dumps(result)}
|
||||
```
|
||||
|
||||
`asyncio.run` creates a fresh event loop for each invocation. Lambda's Python runtime is synchronous at the handler boundary; `asyncio.run` is the correct bridge between the sync entrypoint and the async internals. A module-level `loop = asyncio.get_event_loop()` would work on the first invocation but is deprecated and unreliable across warm invocations.
|
||||
|
||||
`getattr(context, "aws_request_id", ...)` falls back to a generated UUID when the context object doesn't have the attribute — which is the case in the local tester, where the runner passes a minimal stub.
|
||||
|
||||
---
|
||||
|
||||
## Summary of improvements
|
||||
|
||||
| # | What changed | Why it matters |
|
||||
|---|---|---|
|
||||
| 1 | Config from event, env as fallback | One function serves many prefixes |
|
||||
| 2 | Structured JSON logging (`_log`) | Queryable in Logs Insights; visible in tester |
|
||||
| 3 | EMF metrics (`_emit_emf`) | Free CloudWatch metrics, no API call overhead |
|
||||
| 4 | Producer `try/finally` with N sentinels | Consumers always exit; handler never hangs |
|
||||
| 5 | Per-item `try/except` in consumer | One bad key doesn't crash the batch |
|
||||
| 6 | Sync file handle as `Body` | Flat memory for any manifest size |
|
||||
| 7 | `PageSize=1000` | Fewer S3 round-trips on large prefixes |
|
||||
| 8 | N concurrent consumers via `asyncio.gather` | Presign throughput scales with `concurrency` |
|
||||
1
functions/sign_pdfs/events/default.json
Normal file
1
functions/sign_pdfs/events/default.json
Normal file
@@ -0,0 +1 @@
|
||||
{}
|
||||
@@ -23,11 +23,13 @@ async def _run():
|
||||
|
||||
async def producer():
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
try:
|
||||
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
|
||||
for obj in page.get("Contents", []) or []:
|
||||
key = obj["Key"]
|
||||
if key.lower().endswith(".pdf"):
|
||||
await queue.put(key)
|
||||
finally:
|
||||
await queue.put(_DONE)
|
||||
|
||||
async def consumer():
|
||||
6
functions/sign_pdfs/requirements.txt
Normal file
6
functions/sign_pdfs/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
# Deps for the sign_pdfs lambda. Bundled into its deployment zip when
|
||||
# uploading to AWS; locally, the runner pod installs the union of all
|
||||
# per-function requirements (see Dockerfile.lambda).
|
||||
aioboto3>=15.0 # async S3 client used in handler.py
|
||||
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
|
||||
boto3>=1.40 # sync S3 client used by seed.py (data setup utility)
|
||||
1
functions/sign_pdfs_optimized/events/default.json
Normal file
1
functions/sign_pdfs_optimized/events/default.json
Normal file
@@ -0,0 +1 @@
|
||||
{}
|
||||
6
functions/sign_pdfs_optimized/events/tuned.json
Normal file
6
functions/sign_pdfs_optimized/events/tuned.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"bucket": "my-company-reports-bucket",
|
||||
"prefix": "2026/04/",
|
||||
"page_size": 1000,
|
||||
"concurrency": 4
|
||||
}
|
||||
165
functions/sign_pdfs_optimized/handler.py
Normal file
165
functions/sign_pdfs_optimized/handler.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""sign_pdfs_optimized — production-refined fork of sign_pdfs.
|
||||
|
||||
Same contract as the original (def handler(event, context) → {"statusCode": 200, "body": ...})
|
||||
but with 8 production refinements applied side-by-side for the demo:
|
||||
|
||||
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
|
||||
env vars are fallback only. Same function can serve many prefixes.
|
||||
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
|
||||
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
|
||||
ManifestBytes, ResponseBytes.
|
||||
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
|
||||
per consumer, not a single _DONE).
|
||||
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
|
||||
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
|
||||
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
|
||||
8. N concurrent consumers via asyncio.gather — presign throughput scales with
|
||||
the concurrency knob in the event.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import aioboto3
|
||||
import aiofiles
|
||||
|
||||
_DONE = object()
|
||||
|
||||
|
||||
def _cfg(event: dict) -> dict:
|
||||
return {
|
||||
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
|
||||
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
|
||||
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
|
||||
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
||||
"page_size": int(event.get("page_size") or 1000),
|
||||
"concurrency": int(event.get("concurrency") or 4),
|
||||
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
||||
}
|
||||
|
||||
|
||||
def _log(event_type: str, **fields):
|
||||
print(json.dumps({"event": event_type, **fields}))
|
||||
|
||||
|
||||
def _emit_emf(metrics: dict, **dims):
|
||||
print(json.dumps({
|
||||
"_aws": {
|
||||
"Timestamp": int(time.time() * 1000),
|
||||
"CloudWatchMetrics": [{
|
||||
"Namespace": "eth/sign_pdfs_optimized",
|
||||
"Dimensions": [list(dims.keys())],
|
||||
"Metrics": [
|
||||
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
|
||||
for k in metrics
|
||||
],
|
||||
}],
|
||||
},
|
||||
**dims, **metrics,
|
||||
}))
|
||||
|
||||
|
||||
async def _run(event: dict, request_id: str):
|
||||
cfg = _cfg(event)
|
||||
_log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"],
|
||||
page_size=cfg["page_size"], concurrency=cfg["concurrency"])
|
||||
t0 = time.monotonic()
|
||||
|
||||
pages = 0
|
||||
errors = 0
|
||||
session = aioboto3.Session()
|
||||
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
|
||||
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
|
||||
|
||||
async def producer():
|
||||
nonlocal pages
|
||||
try:
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
async for page in paginator.paginate(
|
||||
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
|
||||
PaginationConfig={"PageSize": cfg["page_size"]},
|
||||
):
|
||||
pages += 1
|
||||
for obj in page.get("Contents", []) or []:
|
||||
key = obj["Key"]
|
||||
if key.lower().endswith(".pdf"):
|
||||
await queue.put(key)
|
||||
finally:
|
||||
# one sentinel per consumer so each gather() task exits cleanly
|
||||
for _ in range(cfg["concurrency"]):
|
||||
await queue.put(_DONE)
|
||||
|
||||
write_lock = asyncio.Lock()
|
||||
async with aiofiles.open(manifest_path, "w") as f:
|
||||
|
||||
async def consumer():
|
||||
nonlocal errors
|
||||
local = 0
|
||||
while True:
|
||||
item = await queue.get()
|
||||
if item is _DONE:
|
||||
return local
|
||||
try:
|
||||
url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": cfg["bucket"], "Key": item},
|
||||
ExpiresIn=cfg["expiry"],
|
||||
)
|
||||
line = json.dumps({"key": item, "url": url}) + "\n"
|
||||
async with write_lock:
|
||||
await f.write(line)
|
||||
local += 1
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
_log("presign_error", request_id=request_id, key=item, error=str(exc))
|
||||
|
||||
prod_task = asyncio.create_task(producer())
|
||||
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
|
||||
await prod_task
|
||||
|
||||
count = sum(counts)
|
||||
manifest_bytes = os.path.getsize(manifest_path)
|
||||
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
||||
|
||||
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
|
||||
# the whole manifest in memory like `body = await f.read()` would.
|
||||
with open(manifest_path, "rb") as f:
|
||||
await s3.put_object(
|
||||
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
|
||||
ContentType="application/x-ndjson",
|
||||
)
|
||||
|
||||
manifest_url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
|
||||
ExpiresIn=cfg["expiry"],
|
||||
)
|
||||
os.unlink(manifest_path)
|
||||
|
||||
result = {
|
||||
"count": count,
|
||||
"errors": errors,
|
||||
"manifest_key": manifest_key,
|
||||
"manifest_url": manifest_url,
|
||||
}
|
||||
response_bytes = len(json.dumps(result))
|
||||
duration_ms = (time.monotonic() - t0) * 1000
|
||||
|
||||
_log("complete", request_id=request_id, count=count, errors=errors,
|
||||
pages=pages, duration_ms=round(duration_ms, 2))
|
||||
_emit_emf({
|
||||
"PDFsProcessed": count, "S3ListPages": pages,
|
||||
"PresignCount": count, "ManifestBytes": manifest_bytes,
|
||||
"ResponseBytes": response_bytes,
|
||||
}, Function="sign_pdfs_optimized")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
||||
result = asyncio.run(_run(event, request_id))
|
||||
return {"statusCode": 200, "body": json.dumps(result)}
|
||||
3
functions/sign_pdfs_optimized/requirements.txt
Normal file
3
functions/sign_pdfs_optimized/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
# Same deps as sign_pdfs/ — this is the "after" demo fork, same dep footprint.
|
||||
aioboto3>=15.0 # async S3 client
|
||||
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
|
||||
56
invoke.py
56
invoke.py
@@ -1,15 +1,65 @@
|
||||
"""Shared CLI invoker for any function in functions/.
|
||||
|
||||
Usage (inside the lambda pod, via `bash ctrl/invoke.sh [name [event_json]]`):
|
||||
python invoke.py # invokes the only function, or the first found
|
||||
python invoke.py lambda_function # specific function
|
||||
python invoke.py lambda_function '{}' # with event payload
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Defaults match the in-cluster configmap so behavior is identical whether
|
||||
# this script is invoked directly or via the FastAPI runner. `setdefault`
|
||||
# never overrides an existing env var — the configmap wins in the pod.
|
||||
os.environ.setdefault("BUCKET_NAME", "my-company-reports-bucket")
|
||||
os.environ.setdefault("PREFIX", "2026/04/")
|
||||
os.environ.setdefault("S3_ENDPOINT_URL", "http://localhost:9000")
|
||||
os.environ.setdefault("S3_ENDPOINT_URL", "http://minio:9000")
|
||||
os.environ.setdefault("AWS_ACCESS_KEY_ID", "minioadmin")
|
||||
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "minioadmin")
|
||||
os.environ.setdefault("AWS_REGION", "us-east-1")
|
||||
|
||||
from lambda_function import handler # noqa: E402
|
||||
REPO_ROOT = Path(__file__).parent
|
||||
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", str(REPO_ROOT / "functions")))
|
||||
SHARED_DIR = REPO_ROOT / "shared"
|
||||
|
||||
# Make shared/ importable from any handler ("from shared import ..."). Matches
|
||||
# how a Lambda Layer would expose code on PYTHONPATH at runtime.
|
||||
if SHARED_DIR.exists() and str(REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
|
||||
def _pick_default_function() -> str:
|
||||
candidates = sorted(
|
||||
d.name for d in FUNCTIONS_DIR.iterdir()
|
||||
if d.is_dir() and not d.name.startswith("_") and (d / "handler.py").exists()
|
||||
)
|
||||
if not candidates:
|
||||
print(f"no function folders with handler.py in {FUNCTIONS_DIR}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
return candidates[0]
|
||||
|
||||
|
||||
def _load(name: str):
|
||||
path = FUNCTIONS_DIR / name / "handler.py"
|
||||
if not path.exists():
|
||||
print(f"function not found: {path}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
spec = importlib.util.spec_from_file_location(f"functions.{name}.handler", path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
if not hasattr(module, "handler"):
|
||||
print(f"{path} has no handler(event, context)", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
return module
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
response = handler({}, None)
|
||||
name = sys.argv[1] if len(sys.argv) > 1 else _pick_default_function()
|
||||
event = json.loads(sys.argv[2]) if len(sys.argv) > 2 else {}
|
||||
module = _load(name)
|
||||
response = module.handler(event, None)
|
||||
print(json.dumps(response, indent=2))
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
aioboto3>=15.0
|
||||
aiofiles>=23.2
|
||||
boto3>=1.40
|
||||
# Root requirements = runner + invoker deps only. Per-function deps live
|
||||
# alongside each function (functions/<name>/requirements.txt) so they're
|
||||
# bundled with the function's deployment zip for AWS.
|
||||
fastapi>=0.115
|
||||
uvicorn[standard]>=0.32
|
||||
|
||||
432
runner.py
Normal file
432
runner.py
Normal file
@@ -0,0 +1,432 @@
|
||||
"""Local Lambda runner — FastAPI wrapper that invokes any `handler(event, context)`
|
||||
file in /app and reports AWS-equivalent metrics. Nothing in this file is touched
|
||||
by the lambda function itself; functions stay verbatim-uploadable to AWS.
|
||||
|
||||
Features that are scaffolded now and "light up" later when matching improvements
|
||||
land in the function:
|
||||
- event payload pass-through (improvement #1: BUCKET/PREFIX from event)
|
||||
- structured JSON log capture (improvement #2: JSON logging to stdout)
|
||||
- EMF metric extraction (improvement #3: CloudWatch EMF embedded metrics)
|
||||
Until the function emits those, the corresponding output fields are empty.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import io
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import resource
|
||||
import subprocess
|
||||
import sys
|
||||
import sysconfig
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import zipfile
|
||||
from contextlib import redirect_stderr, redirect_stdout
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
FUNCTIONS_DIR = Path(os.environ.get("FUNCTIONS_DIR", "/app/functions"))
|
||||
SHARED_DIR = Path(os.environ.get("SHARED_DIR", "/app/shared"))
|
||||
MAX_INVOCATIONS = int(os.environ.get("RUNNER_MAX_INVOCATIONS", "200"))
|
||||
|
||||
# Make shared/ importable for any function. Mirrors AWS Lambda Layer behavior
|
||||
# (layer code is added to PYTHONPATH for all functions that attach it).
|
||||
if SHARED_DIR.exists():
|
||||
_repo_root = str(SHARED_DIR.parent)
|
||||
if _repo_root not in sys.path:
|
||||
sys.path.insert(0, _repo_root)
|
||||
|
||||
app = FastAPI(title="Lambda Local Runner")
|
||||
|
||||
_modules: dict = {} # name -> imported module (cache; presence = warm)
|
||||
_module_deps: dict = {} # name -> set of sys.modules keys added during this function's init
|
||||
_invocations: list[dict] = [] # newest last; capped at MAX_INVOCATIONS
|
||||
|
||||
|
||||
class LambdaContext:
|
||||
"""Minimal stand-in for the AWS Lambda context object. The function file
|
||||
doesn't use any of this today, but the shape is right for when improvements
|
||||
add `context.aws_request_id` to structured logs etc."""
|
||||
|
||||
def __init__(self, request_id: str, function_name: str, memory_mb: int, timeout_ms: int):
|
||||
self.aws_request_id = request_id
|
||||
self.function_name = function_name
|
||||
self.function_version = "$LATEST"
|
||||
self.invoked_function_arn = f"arn:aws:lambda:local:000000000000:function:{function_name}"
|
||||
self.memory_limit_in_mb = memory_mb
|
||||
self.log_group_name = f"/aws/lambda/{function_name}"
|
||||
self.log_stream_name = f"local/{time.strftime('%Y/%m/%d')}/[$LATEST]{request_id}"
|
||||
self._deadline_ms = time.monotonic() * 1000 + timeout_ms
|
||||
|
||||
def get_remaining_time_in_millis(self) -> int:
|
||||
return max(0, int(self._deadline_ms - time.monotonic() * 1000))
|
||||
|
||||
|
||||
class InvokeRequest(BaseModel):
|
||||
event: dict = Field(default_factory=dict)
|
||||
# AWS Lambda memory sizes: 128, 256, 512, 1024, 1536, 2048, 3008, 5120, 10240
|
||||
memory_mb: int = 128
|
||||
# AWS Lambda timeout: 1-900 seconds. Locally we record it but don't kill
|
||||
# the handler (matches "function works verbatim" — no signal interruption).
|
||||
timeout_ms: int = 30_000
|
||||
|
||||
|
||||
class ScriptRequest(BaseModel):
|
||||
args: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
@app.get("/functions")
|
||||
def list_functions():
|
||||
"""Scan FUNCTIONS_DIR for subfolders containing a handler.py with
|
||||
`def handler(event, context):`. Each function lives in its own folder
|
||||
(matches AWS Lambda's deployment-package shape)."""
|
||||
funcs: list[dict] = []
|
||||
if not FUNCTIONS_DIR.exists():
|
||||
return {"functions": [], "functions_dir": str(FUNCTIONS_DIR), "error": "directory not found"}
|
||||
for d in sorted(FUNCTIONS_DIR.iterdir()):
|
||||
if not d.is_dir() or d.name.startswith("_"):
|
||||
continue
|
||||
handler = d / "handler.py"
|
||||
if not handler.exists():
|
||||
continue
|
||||
try:
|
||||
text = handler.read_text()
|
||||
except Exception:
|
||||
continue
|
||||
if "def handler(event, context)" not in text and "def handler(event,context)" not in text:
|
||||
continue
|
||||
# Discover sample events (events/*.json) so the UI can populate a dropdown.
|
||||
events = sorted(p.name for p in (d / "events").glob("*.json")) if (d / "events").is_dir() else []
|
||||
funcs.append({"name": d.name, "events": events})
|
||||
return {"functions": funcs, "functions_dir": str(FUNCTIONS_DIR)}
|
||||
|
||||
|
||||
@app.get("/functions/{name}/events/{filename}")
|
||||
def get_event(name: str, filename: str):
|
||||
"""Serve a sample event file so the UI can preview/select it."""
|
||||
path = FUNCTIONS_DIR / name / "events" / filename
|
||||
if not path.exists() or not path.is_file():
|
||||
raise HTTPException(status_code=404, detail="event file not found")
|
||||
try:
|
||||
return json.loads(path.read_text())
|
||||
except json.JSONDecodeError as e:
|
||||
raise HTTPException(status_code=400, detail=f"invalid JSON in {path}: {e}")
|
||||
|
||||
|
||||
@app.post("/invoke/{name}")
|
||||
def invoke(name: str, req: InvokeRequest):
|
||||
"""Invoke a handler. Sync def so FastAPI runs us in a thread — that lets
|
||||
the function call `asyncio.run(...)` internally without nested-loop errors
|
||||
(which is how the current handler.py works)."""
|
||||
target = FUNCTIONS_DIR / name / "handler.py"
|
||||
if not target.exists():
|
||||
raise HTTPException(status_code=404, detail=f"{target} not found")
|
||||
|
||||
invocation_id = str(uuid.uuid4())
|
||||
cold_start = name not in _modules
|
||||
init_duration_ms = None
|
||||
record: dict = {
|
||||
"invocation_id": invocation_id,
|
||||
"function": name,
|
||||
"timestamp": time.time(),
|
||||
"event": req.event,
|
||||
"result": None,
|
||||
"error": None,
|
||||
"stdout": "",
|
||||
"stderr": "",
|
||||
"structured_logs": [],
|
||||
"emf_metrics": [],
|
||||
"metrics": {
|
||||
"cold_start": cold_start,
|
||||
"init_duration_ms": None,
|
||||
"duration_ms": 0.0,
|
||||
"billed_duration_ms": 0,
|
||||
"memory_size_mb": req.memory_mb,
|
||||
"max_memory_used_mb": 0.0,
|
||||
},
|
||||
}
|
||||
|
||||
# Cold-start: import the module and time the import. This matches AWS's
|
||||
# "Init Duration" — time to load module-level code (imports + module-scope
|
||||
# statements). On warm invocations this whole block is skipped.
|
||||
if cold_start:
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
f"functions.{name}.handler", target,
|
||||
)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
# Snapshot sys.modules so /reset can pop the transitive deps this function
|
||||
# pulled in (aioboto3 → aiobotocore → botocore, etc.). Without this the
|
||||
# second cold start is fake — heavy imports stay cached in the long-running
|
||||
# uvicorn process, and Force Cold reports unrealistically small numbers.
|
||||
sys_modules_before = set(sys.modules)
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
spec.loader.exec_module(module)
|
||||
except Exception as e:
|
||||
init_duration_ms = (time.monotonic() - t0) * 1000
|
||||
record["error"] = _format_exception(e)
|
||||
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
|
||||
return _record(record)
|
||||
init_duration_ms = (time.monotonic() - t0) * 1000
|
||||
_modules[name] = module
|
||||
_module_deps[name] = set(sys.modules) - sys_modules_before
|
||||
record["metrics"]["init_duration_ms"] = round(init_duration_ms, 2)
|
||||
module = _modules[name]
|
||||
|
||||
if not hasattr(module, "handler"):
|
||||
raise HTTPException(status_code=400, detail=f"{name}.py has no handler() function")
|
||||
|
||||
context = LambdaContext(
|
||||
request_id=invocation_id,
|
||||
function_name=name,
|
||||
memory_mb=req.memory_mb,
|
||||
timeout_ms=req.timeout_ms,
|
||||
)
|
||||
|
||||
stdout_buf = io.StringIO()
|
||||
stderr_buf = io.StringIO()
|
||||
t_handler = time.monotonic()
|
||||
try:
|
||||
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
|
||||
result = module.handler(req.event, context)
|
||||
# Defensive: if a future async handler ever returns a coroutine
|
||||
# (AWS doesn't support that natively, but we might), run it.
|
||||
if asyncio.iscoroutine(result):
|
||||
result = asyncio.run(result)
|
||||
record["result"] = result
|
||||
except Exception as e:
|
||||
record["error"] = _format_exception(e)
|
||||
duration_ms = (time.monotonic() - t_handler) * 1000
|
||||
|
||||
# ru_maxrss: kilobytes on Linux, bytes on macOS. We run on Linux in kind.
|
||||
rusage = resource.getrusage(resource.RUSAGE_SELF)
|
||||
max_memory_mb = rusage.ru_maxrss / 1024
|
||||
|
||||
record["stdout"] = stdout_buf.getvalue()
|
||||
record["stderr"] = stderr_buf.getvalue()
|
||||
record["structured_logs"] = _extract_json_logs(record["stdout"])
|
||||
record["emf_metrics"] = _extract_emf_metrics(record["stdout"])
|
||||
record["metrics"]["duration_ms"] = round(duration_ms, 2)
|
||||
record["metrics"]["billed_duration_ms"] = int(math.ceil(duration_ms))
|
||||
record["metrics"]["max_memory_used_mb"] = round(max_memory_mb, 2)
|
||||
return _record(record)
|
||||
|
||||
|
||||
@app.get("/invocations")
|
||||
def list_invocations(limit: int = 50):
|
||||
"""Index of past invocations, newest first. Lightweight summary only —
|
||||
use /invocations/{id} for the full record."""
|
||||
items = []
|
||||
for r in reversed(_invocations[-limit:]):
|
||||
items.append({
|
||||
"invocation_id": r["invocation_id"],
|
||||
"function": r["function"],
|
||||
"timestamp": r["timestamp"],
|
||||
"cold_start": r["metrics"]["cold_start"],
|
||||
"duration_ms": r["metrics"]["duration_ms"],
|
||||
"init_duration_ms": r["metrics"]["init_duration_ms"],
|
||||
"max_memory_used_mb": r["metrics"]["max_memory_used_mb"],
|
||||
"ok": r["error"] is None,
|
||||
})
|
||||
return {"invocations": items, "total": len(_invocations)}
|
||||
|
||||
|
||||
@app.get("/invocations/{invocation_id}")
|
||||
def get_invocation(invocation_id: str):
|
||||
for r in _invocations:
|
||||
if r["invocation_id"] == invocation_id:
|
||||
return r
|
||||
raise HTTPException(status_code=404, detail="invocation not found")
|
||||
|
||||
|
||||
@app.delete("/invocations")
|
||||
def clear_invocations():
|
||||
n = len(_invocations)
|
||||
_invocations.clear()
|
||||
return {"cleared": n}
|
||||
|
||||
|
||||
@app.post("/reset")
|
||||
def reset_modules():
|
||||
"""Clear the module cache AND the transitive imports each function pulled in
|
||||
during its init, so the next invocation pays a realistic cold-start cost
|
||||
(re-importing aioboto3 → aiobotocore → botocore from disk, not a no-op
|
||||
against an already-warm uvicorn process)."""
|
||||
cleared = list(_modules.keys())
|
||||
popped = 0
|
||||
for name in cleared:
|
||||
sys.modules.pop(f"functions.{name}.handler", None)
|
||||
for dep in _module_deps.pop(name, ()):
|
||||
if sys.modules.pop(dep, None) is not None:
|
||||
popped += 1
|
||||
_modules.clear()
|
||||
return {"cleared": cleared, "transitive_modules_popped": popped}
|
||||
|
||||
|
||||
@app.get("/functions/{name}/scripts")
|
||||
def list_scripts(name: str):
|
||||
"""List support scripts for a function — any .py file that isn't handler.py."""
|
||||
func_dir = FUNCTIONS_DIR / name
|
||||
if not func_dir.is_dir():
|
||||
raise HTTPException(status_code=404, detail=f"function {name!r} not found")
|
||||
scripts = [
|
||||
p.name for p in sorted(func_dir.glob("*.py"))
|
||||
if p.name not in ("handler.py", "__init__.py")
|
||||
]
|
||||
return {"scripts": scripts, "function": name}
|
||||
|
||||
|
||||
@app.post("/scripts/{fn_name}/{script_name}")
|
||||
def run_script(fn_name: str, script_name: str, req: ScriptRequest):
|
||||
"""Run a support script from functions/<fn_name>/<script_name> with optional args."""
|
||||
if ".." in script_name or "/" in script_name:
|
||||
raise HTTPException(status_code=400, detail="invalid script name")
|
||||
if not script_name.endswith(".py"):
|
||||
raise HTTPException(status_code=400, detail="only .py scripts allowed")
|
||||
script_path = FUNCTIONS_DIR / fn_name / script_name
|
||||
if not script_path.exists():
|
||||
raise HTTPException(status_code=404, detail=f"{script_path} not found")
|
||||
cmd = [sys.executable, str(script_path)] + list(req.args)
|
||||
t0 = time.monotonic()
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"returncode": -1, "stdout": "", "stderr": "timed out after 300 s",
|
||||
"duration_ms": 300_000.0}
|
||||
return {
|
||||
"returncode": result.returncode,
|
||||
"stdout": result.stdout,
|
||||
"stderr": result.stderr,
|
||||
"duration_ms": round((time.monotonic() - t0) * 1000, 2),
|
||||
}
|
||||
|
||||
|
||||
@app.get("/packaging")
|
||||
def packaging():
|
||||
"""Static sizing report — what each function would ship as a Lambda deployment zip,
|
||||
what the shared layer would weigh, what the largest installed deps look like.
|
||||
All computed against the live pod filesystem so numbers are real, not extrapolated."""
|
||||
funcs = []
|
||||
for d in sorted(FUNCTIONS_DIR.iterdir()):
|
||||
if not d.is_dir() or d.name.startswith("_"):
|
||||
continue
|
||||
if not (d / "handler.py").exists():
|
||||
continue
|
||||
funcs.append({
|
||||
"name": d.name,
|
||||
"handler_bytes": (d / "handler.py").stat().st_size,
|
||||
"folder_bytes": _dir_bytes(d),
|
||||
"folder_zip_bytes": _zip_bytes(d),
|
||||
})
|
||||
|
||||
site_packages = _site_packages_dir()
|
||||
deps: list[dict] = []
|
||||
if site_packages:
|
||||
for child in sorted(site_packages.iterdir()):
|
||||
if not child.is_dir():
|
||||
continue
|
||||
if child.name.startswith("_") or child.name.endswith(".dist-info"):
|
||||
continue
|
||||
b = _dir_bytes(child)
|
||||
if b > 50_000:
|
||||
deps.append({"name": child.name, "bytes": b})
|
||||
deps.sort(key=lambda x: -x["bytes"])
|
||||
|
||||
shared_bytes = _dir_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
|
||||
shared_zip = _zip_bytes(SHARED_DIR) if SHARED_DIR.exists() else 0
|
||||
|
||||
return {
|
||||
"functions": funcs,
|
||||
"dependencies": deps[:25],
|
||||
"dependencies_total_bytes": sum(d["bytes"] for d in deps),
|
||||
"shared_layer": {"bytes": shared_bytes, "zip_bytes": shared_zip},
|
||||
"limits": {
|
||||
"zip_upload_max": 50 * 1024 * 1024,
|
||||
"unzipped_max": 250 * 1024 * 1024,
|
||||
"container_image_max": 10 * 1024 * 1024 * 1024,
|
||||
"tmp_default": 512 * 1024 * 1024,
|
||||
"tmp_max": 10 * 1024 * 1024 * 1024,
|
||||
"response_max": 6 * 1024 * 1024,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _dir_bytes(path: Path) -> int:
|
||||
total = 0
|
||||
for p in path.rglob("*"):
|
||||
if p.is_file():
|
||||
try:
|
||||
total += p.stat().st_size
|
||||
except OSError:
|
||||
pass
|
||||
return total
|
||||
|
||||
|
||||
def _zip_bytes(path: Path) -> int:
|
||||
"""Compute deflate-zipped size without writing to disk."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for p in path.rglob("*"):
|
||||
if p.is_file():
|
||||
try:
|
||||
zf.write(p, p.relative_to(path))
|
||||
except OSError:
|
||||
pass
|
||||
return buf.getbuffer().nbytes
|
||||
|
||||
|
||||
def _site_packages_dir() -> Path | None:
|
||||
purelib = sysconfig.get_paths().get("purelib")
|
||||
return Path(purelib) if purelib and Path(purelib).exists() else None
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
return {"ok": True, "loaded_modules": list(_modules.keys()), "invocations": len(_invocations)}
|
||||
|
||||
|
||||
def _format_exception(e: BaseException) -> dict:
|
||||
return {
|
||||
"type": type(e).__name__,
|
||||
"message": str(e),
|
||||
"traceback": traceback.format_exc(),
|
||||
}
|
||||
|
||||
|
||||
def _extract_json_logs(stdout_text: str) -> list[dict]:
|
||||
"""Parse JSON-per-line structured logs out of stdout. Fails silently —
|
||||
until the function emits structured logs (improvement #2), this returns []."""
|
||||
logs: list[dict] = []
|
||||
for line in stdout_text.splitlines():
|
||||
line = line.strip()
|
||||
if not (line.startswith("{") and line.endswith("}")):
|
||||
continue
|
||||
try:
|
||||
logs.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return logs
|
||||
|
||||
|
||||
def _extract_emf_metrics(stdout_text: str) -> list[dict]:
|
||||
"""Parse CloudWatch EMF metric records out of stdout. EMF format:
|
||||
{"_aws": {"CloudWatchMetrics": [...], "Timestamp": ...}, "<metric>": value, ...}
|
||||
Fails silently — until the function emits EMF (improvement #3), returns []."""
|
||||
metrics: list[dict] = []
|
||||
for entry in _extract_json_logs(stdout_text):
|
||||
aws = entry.get("_aws")
|
||||
if isinstance(aws, dict) and "CloudWatchMetrics" in aws:
|
||||
metrics.append(entry)
|
||||
return metrics
|
||||
|
||||
|
||||
def _record(rec: dict) -> dict:
|
||||
_invocations.append(rec)
|
||||
if len(_invocations) > MAX_INVOCATIONS:
|
||||
del _invocations[: len(_invocations) - MAX_INVOCATIONS]
|
||||
return rec
|
||||
21
shared/README.md
Normal file
21
shared/README.md
Normal file
@@ -0,0 +1,21 @@
|
||||
# shared/
|
||||
|
||||
Cross-function Python modules — the local equivalent of an AWS Lambda Layer.
|
||||
|
||||
Anything in this directory is available to every function under
|
||||
`functions/<name>/handler.py` as a regular import:
|
||||
|
||||
```python
|
||||
# functions/sign_pdfs/handler.py
|
||||
from shared import common_utils
|
||||
```
|
||||
|
||||
The `shared/` directory is added to `sys.path` by the runner. For an AWS
|
||||
deploy, you'd either:
|
||||
|
||||
1. Package it as a real Lambda Layer (preferred), and reference the layer ARN
|
||||
from each function's deploy spec, **or**
|
||||
2. Vendor a copy into each function's deployment zip (simpler, less DRY).
|
||||
|
||||
Currently empty — no cross-function code yet. First candidate would probably
|
||||
be a structured-logging helper once multiple functions need it.
|
||||
Reference in New Issue
Block a user