bootstrap SAM stack — S3 bucket + stub Lambda
This commit is contained in:
6
functions/sign_pdfs/events/tuned.json
Normal file
6
functions/sign_pdfs/events/tuned.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"bucket": "my-company-reports-bucket",
|
||||
"prefix": "2026/04/",
|
||||
"page_size": 1000,
|
||||
"concurrency": 4
|
||||
}
|
||||
@@ -1,81 +1,171 @@
|
||||
"""sign_pdfs — production handler.
|
||||
|
||||
Generates 15-minute presigned S3 URLs for every PDF under a bucket+prefix,
|
||||
writes a JSONL manifest to S3, returns the manifest URL.
|
||||
|
||||
This is the canonical version. functions/sign_pdfs_v1/ keeps the original
|
||||
naive implementation around for the local "before/after" comparison in the
|
||||
tester UI; only this one gets deployed to AWS.
|
||||
|
||||
Production refinements vs sign_pdfs_v1 (see docs/lambdas-md/lambda-20-sign-pdfs.md):
|
||||
|
||||
1. Config from the event payload (bucket / prefix / expiry / page_size / concurrency),
|
||||
env vars are fallback only. Same function can serve many prefixes.
|
||||
2. Structured JSON logging — request_id, bucket, prefix, count, pages, duration_ms.
|
||||
3. CloudWatch EMF metrics on stdout — PDFsProcessed, S3ListPages, PresignCount,
|
||||
ManifestBytes, ResponseBytes.
|
||||
4. Producer wraps the S3 loop in try/finally so consumers always exit (one sentinel
|
||||
per consumer, not a single _DONE).
|
||||
5. Consumer per-item try/except — failed presigns count into errors, batch survives.
|
||||
6. Manifest streamed to S3 (sync file handle as Body) — no full read into RAM.
|
||||
7. PageSize=1000 (S3 maximum) — fewer round-trips on large prefixes.
|
||||
8. N concurrent consumers via asyncio.gather — presign throughput scales with
|
||||
the concurrency knob in the event.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import aioboto3
|
||||
import aiofiles
|
||||
|
||||
BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket")
|
||||
PREFIX = os.environ.get("PREFIX", "2026/04/")
|
||||
EXPIRY = int(os.environ.get("URL_EXPIRY_SECONDS", "900"))
|
||||
ENDPOINT = os.environ.get("S3_ENDPOINT_URL") or None
|
||||
QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "2000"))
|
||||
|
||||
_DONE = object()
|
||||
|
||||
|
||||
async def _run():
|
||||
def _cfg(event: dict) -> dict:
|
||||
return {
|
||||
"bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"),
|
||||
"prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"),
|
||||
"expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")),
|
||||
"endpoint": os.environ.get("S3_ENDPOINT_URL") or None,
|
||||
"page_size": int(event.get("page_size") or 1000),
|
||||
"concurrency": int(event.get("concurrency") or 4),
|
||||
"queue_max": int(os.environ.get("QUEUE_MAX", "2000")),
|
||||
}
|
||||
|
||||
|
||||
def _log(event_type: str, **fields):
|
||||
print(json.dumps({"event": event_type, **fields}))
|
||||
|
||||
|
||||
def _emit_emf(metrics: dict, **dims):
|
||||
print(json.dumps({
|
||||
"_aws": {
|
||||
"Timestamp": int(time.time() * 1000),
|
||||
"CloudWatchMetrics": [{
|
||||
"Namespace": "eth/sign_pdfs",
|
||||
"Dimensions": [list(dims.keys())],
|
||||
"Metrics": [
|
||||
{"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"}
|
||||
for k in metrics
|
||||
],
|
||||
}],
|
||||
},
|
||||
**dims, **metrics,
|
||||
}))
|
||||
|
||||
|
||||
async def _run(event: dict, request_id: str):
|
||||
cfg = _cfg(event)
|
||||
_log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"],
|
||||
page_size=cfg["page_size"], concurrency=cfg["concurrency"])
|
||||
t0 = time.monotonic()
|
||||
|
||||
pages = 0
|
||||
errors = 0
|
||||
session = aioboto3.Session()
|
||||
async with session.client("s3", endpoint_url=ENDPOINT) as s3:
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAX)
|
||||
async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3:
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"])
|
||||
manifest_path = f"/tmp/{uuid.uuid4()}.jsonl"
|
||||
|
||||
async def producer():
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
nonlocal pages
|
||||
try:
|
||||
async for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX, PaginationConfig={"PageSize": 100}):
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
async for page in paginator.paginate(
|
||||
Bucket=cfg["bucket"], Prefix=cfg["prefix"],
|
||||
PaginationConfig={"PageSize": cfg["page_size"]},
|
||||
):
|
||||
pages += 1
|
||||
for obj in page.get("Contents", []) or []:
|
||||
key = obj["Key"]
|
||||
if key.lower().endswith(".pdf"):
|
||||
await queue.put(key)
|
||||
finally:
|
||||
await queue.put(_DONE)
|
||||
# one sentinel per consumer so each gather() task exits cleanly
|
||||
for _ in range(cfg["concurrency"]):
|
||||
await queue.put(_DONE)
|
||||
|
||||
async def consumer():
|
||||
count = 0
|
||||
async with aiofiles.open(manifest_path, "w") as f:
|
||||
write_lock = asyncio.Lock()
|
||||
async with aiofiles.open(manifest_path, "w") as f:
|
||||
|
||||
async def consumer():
|
||||
nonlocal errors
|
||||
local = 0
|
||||
while True:
|
||||
item = await queue.get()
|
||||
if item is _DONE:
|
||||
break
|
||||
url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": BUCKET, "Key": item},
|
||||
ExpiresIn=EXPIRY,
|
||||
)
|
||||
await f.write(json.dumps({"key": item, "url": url}) + "\n")
|
||||
count += 1
|
||||
return count
|
||||
return local
|
||||
try:
|
||||
url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": cfg["bucket"], "Key": item},
|
||||
ExpiresIn=cfg["expiry"],
|
||||
)
|
||||
line = json.dumps({"key": item, "url": url}) + "\n"
|
||||
async with write_lock:
|
||||
await f.write(line)
|
||||
local += 1
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
_log("presign_error", request_id=request_id, key=item, error=str(exc))
|
||||
|
||||
prod_task = asyncio.create_task(producer())
|
||||
count = await consumer()
|
||||
await prod_task
|
||||
prod_task = asyncio.create_task(producer())
|
||||
counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"])))
|
||||
await prod_task
|
||||
|
||||
count = sum(counts)
|
||||
manifest_bytes = os.path.getsize(manifest_path)
|
||||
manifest_key = f"manifests/{uuid.uuid4()}.jsonl"
|
||||
async with aiofiles.open(manifest_path, "rb") as f:
|
||||
body = await f.read()
|
||||
await s3.put_object(
|
||||
Bucket=BUCKET,
|
||||
Key=manifest_key,
|
||||
Body=body,
|
||||
ContentType="application/x-ndjson",
|
||||
)
|
||||
|
||||
# Sync file handle as Body — aiobotocore reads in chunks instead of buffering
|
||||
# the whole manifest in memory like `body = await f.read()` would.
|
||||
with open(manifest_path, "rb") as f:
|
||||
await s3.put_object(
|
||||
Bucket=cfg["bucket"], Key=manifest_key, Body=f,
|
||||
ContentType="application/x-ndjson",
|
||||
)
|
||||
|
||||
manifest_url = await s3.generate_presigned_url(
|
||||
"get_object",
|
||||
Params={"Bucket": BUCKET, "Key": manifest_key},
|
||||
ExpiresIn=EXPIRY,
|
||||
Params={"Bucket": cfg["bucket"], "Key": manifest_key},
|
||||
ExpiresIn=cfg["expiry"],
|
||||
)
|
||||
|
||||
os.unlink(manifest_path)
|
||||
|
||||
return {
|
||||
result = {
|
||||
"count": count,
|
||||
"errors": errors,
|
||||
"manifest_key": manifest_key,
|
||||
"manifest_url": manifest_url,
|
||||
}
|
||||
response_bytes = len(json.dumps(result))
|
||||
duration_ms = (time.monotonic() - t0) * 1000
|
||||
|
||||
_log("complete", request_id=request_id, count=count, errors=errors,
|
||||
pages=pages, duration_ms=round(duration_ms, 2))
|
||||
_emit_emf({
|
||||
"PDFsProcessed": count, "S3ListPages": pages,
|
||||
"PresignCount": count, "ManifestBytes": manifest_bytes,
|
||||
"ResponseBytes": response_bytes,
|
||||
}, Function="sign_pdfs")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
result = asyncio.run(_run())
|
||||
request_id = getattr(context, "aws_request_id", str(uuid.uuid4()))
|
||||
result = asyncio.run(_run(event, request_id))
|
||||
return {"statusCode": 200, "body": json.dumps(result)}
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
# Deps for the sign_pdfs lambda. Bundled into its deployment zip when
|
||||
# uploading to AWS; locally, the runner pod installs the union of all
|
||||
# per-function requirements (see Dockerfile.lambda).
|
||||
aioboto3>=15.0 # async S3 client used in handler.py
|
||||
# Same deps as sign_pdfs/ — this is the "after" demo fork, same dep footprint.
|
||||
aioboto3>=15.0 # async S3 client
|
||||
aiofiles>=23.2 # async file I/O for the JSONL manifest in /tmp
|
||||
boto3>=1.40 # sync S3 client used by seed.py (data setup utility)
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
BUCKET = os.environ.get("BUCKET_NAME", "my-company-reports-bucket")
|
||||
PREFIX = os.environ.get("PREFIX", "2026/04/")
|
||||
ENDPOINT = os.environ.get("S3_ENDPOINT_URL", "http://localhost:9000")
|
||||
DECOY_EXTS = ()
|
||||
|
||||
|
||||
def _client():
|
||||
return boto3.client(
|
||||
"s3",
|
||||
endpoint_url=ENDPOINT,
|
||||
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "minioadmin"),
|
||||
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "minioadmin"),
|
||||
region_name=os.environ.get("AWS_REGION", "us-east-1"),
|
||||
config=Config(signature_version="s3v4"),
|
||||
)
|
||||
|
||||
|
||||
def _ensure_bucket(s3, name):
|
||||
try:
|
||||
s3.head_bucket(Bucket=name)
|
||||
except ClientError:
|
||||
s3.create_bucket(Bucket=name)
|
||||
|
||||
|
||||
def _walk(source_dir):
|
||||
for root, _, files in os.walk(source_dir):
|
||||
for name in files:
|
||||
yield os.path.join(root, name)
|
||||
|
||||
|
||||
def main():
|
||||
source_dir = sys.argv[1] if len(sys.argv) > 1 else os.environ.get("SOURCE_DIR")
|
||||
if not source_dir:
|
||||
print("usage: SOURCE_DIR=<path> python seed.py (or pass as argv[1])", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
if not os.path.isdir(source_dir):
|
||||
print(f"not a directory: {source_dir}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
s3 = _client()
|
||||
_ensure_bucket(s3, BUCKET)
|
||||
|
||||
pdf_n = decoy_n = 0
|
||||
for path in _walk(source_dir):
|
||||
lower = path.lower()
|
||||
is_pdf = lower.endswith(".pdf")
|
||||
is_decoy = lower.endswith(DECOY_EXTS)
|
||||
if not (is_pdf or is_decoy):
|
||||
continue
|
||||
|
||||
rel = os.path.relpath(path, source_dir).replace(os.sep, "/")
|
||||
key = f"{PREFIX}{rel}"
|
||||
try:
|
||||
s3.upload_file(path, BUCKET, key)
|
||||
except (ClientError, OSError) as exc:
|
||||
print(f" skip {path}: {exc}", file=sys.stderr)
|
||||
continue
|
||||
if is_pdf:
|
||||
pdf_n += 1
|
||||
else:
|
||||
decoy_n += 1
|
||||
if (pdf_n + decoy_n) % 100 == 0:
|
||||
print(f" uploaded {pdf_n} pdfs / {decoy_n} decoys ...")
|
||||
|
||||
print(f"done: {pdf_n} pdfs and {decoy_n} decoys uploaded to s3://{BUCKET}/{PREFIX}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user