diff --git a/Makefile b/Makefile index cc4ce11..253421d 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,13 @@ -.PHONY: up down seed invoke install logs console clean graphs docs kind tilt-up tilt-down +.PHONY: install up down clean kind tilt-up tilt-down seed seed-aws invoke sam-build sam-deploy sam-invoke -PY ?= .venv/bin/python +PY := .venv/bin/python COMPOSE := docker compose -f ctrl/docker-compose.yml -DOT_SRC := $(wildcard docs/graphs/*.dot) -SVG_OUT := $(DOT_SRC:.dot=.svg) install: uv sync up: $(COMPOSE) up -d - @echo "MinIO API: http://localhost:9000" - @echo "MinIO console: http://localhost:9001 (minioadmin / minioadmin)" down: $(COMPOSE) down @@ -19,9 +15,6 @@ down: clean: $(COMPOSE) down -v -logs: - $(COMPOSE) logs -f minio - kind: bash ctrl/kind-config.sh @@ -32,20 +25,19 @@ tilt-down: cd ctrl && tilt down --context kind-eth seed: - @if [ -z "$$SOURCE_DIR" ]; then echo "set SOURCE_DIR="; exit 2; fi - $(PY) seed.py "$$SOURCE_DIR" + bash ctrl/seed.sh + +seed-aws: + bash ctrl/seed_aws.sh invoke: $(PY) invoke.py -console: - xdg-open http://localhost:9001 >/dev/null 2>&1 || true +sam-build: + sam build -docs/graphs/%.svg: docs/graphs/%.dot - dot -Tsvg $< -o $@ +sam-deploy: sam-build + sam deploy -graphs: $(SVG_OUT) - @echo "rendered $(words $(SVG_OUT)) svg(s) from $(words $(DOT_SRC)) dot file(s)" - -docs: $(SVG_OUT) - xdg-open docs/index.html >/dev/null 2>&1 || echo "open docs/index.html in your browser" +sam-invoke: + aws lambda invoke --function-name eth-demo-sign-pdfs --payload '{}' --cli-binary-format raw-in-base64-out /tmp/out.json && cat /tmp/out.json diff --git a/ctrl/seed_aws.sh b/ctrl/seed_aws.sh new file mode 100755 index 0000000..65eb7c1 --- /dev/null +++ b/ctrl/seed_aws.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Uploads eth/data/*.pdf to the demo S3 bucket. Curate eth/data/ by hand +# first — the script never reads ~/Documents and has no filter logic of its +# own. Idempotent: aws s3 sync skips files already present at the same size. +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +DATA_DIR="${DATA_DIR:-$REPO_ROOT/data}" +PREFIX="${PREFIX:-2026/04/}" +BUCKET="${BUCKET_NAME:-eth-demo-reports-$(aws sts get-caller-identity --query Account --output text)}" + +if [ ! -d "$DATA_DIR" ]; then + echo "no such directory: $DATA_DIR" >&2 + exit 2 +fi + +pdf_count=$(find "$DATA_DIR" -maxdepth 1 -name "*.pdf" | wc -l) +if [ "$pdf_count" -eq 0 ]; then + echo "no .pdf files in $DATA_DIR — curate first" >&2 + exit 2 +fi + +echo "syncing $pdf_count PDFs from $DATA_DIR to s3://${BUCKET}/${PREFIX}" +aws s3 sync "$DATA_DIR" "s3://${BUCKET}/${PREFIX}" \ + --exclude "*" --include "*.pdf" + +echo +echo "done. verify with:" +echo " aws s3 ls s3://${BUCKET}/${PREFIX} --human-readable | tail" diff --git a/functions/sign_pdfs/handler.py b/functions/sign_pdfs/handler.py index 061c902..306c455 100644 --- a/functions/sign_pdfs/handler.py +++ b/functions/sign_pdfs/handler.py @@ -22,6 +22,7 @@ Production refinements vs sign_pdfs_v1 (see docs/lambdas-md/lambda-20-sign-pdfs. 8. N concurrent consumers via asyncio.gather — presign throughput scales with the concurrency knob in the event. """ + import asyncio import json import os @@ -35,12 +36,17 @@ _DONE = object() def _cfg(event: dict) -> dict: + # bucket and prefix are required — fail loud if neither event nor env supplies + # them. The function deliberately has no fallback default (no "2026/04/" baked + # in) because that's a deployment-time concern, not handler code. return { - "bucket": event.get("bucket") or os.environ.get("BUCKET_NAME", "my-company-reports-bucket"), - "prefix": event.get("prefix") or os.environ.get("PREFIX", "2026/04/"), - "expiry": int(event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900")), + "bucket": event.get("bucket") or os.environ["BUCKET_NAME"], + "prefix": event.get("prefix") or os.environ["PREFIX"], # e.g. "2026/04/" + "expiry": int( + event.get("expiry_seconds") or os.environ.get("URL_EXPIRY_SECONDS", "900") + ), "endpoint": os.environ.get("S3_ENDPOINT_URL") or None, - "page_size": int(event.get("page_size") or 1000), + "page_size": int(event.get("page_size") or 100), "concurrency": int(event.get("concurrency") or 4), "queue_max": int(os.environ.get("QUEUE_MAX", "2000")), } @@ -51,26 +57,42 @@ def _log(event_type: str, **fields): def _emit_emf(metrics: dict, **dims): - print(json.dumps({ - "_aws": { - "Timestamp": int(time.time() * 1000), - "CloudWatchMetrics": [{ - "Namespace": "eth/sign_pdfs", - "Dimensions": [list(dims.keys())], - "Metrics": [ - {"Name": k, "Unit": "Bytes" if k.endswith("Bytes") else "Count"} - for k in metrics - ], - }], - }, - **dims, **metrics, - })) + print( + json.dumps( + { + "_aws": { + "Timestamp": int(time.time() * 1000), + "CloudWatchMetrics": [ + { + "Namespace": "eth/sign_pdfs", + "Dimensions": [list(dims.keys())], + "Metrics": [ + { + "Name": k, + "Unit": "Bytes" if k.endswith("Bytes") else "Count", + } + for k in metrics + ], + } + ], + }, + **dims, + **metrics, + } + ) + ) async def _run(event: dict, request_id: str): cfg = _cfg(event) - _log("start", request_id=request_id, bucket=cfg["bucket"], prefix=cfg["prefix"], - page_size=cfg["page_size"], concurrency=cfg["concurrency"]) + _log( + "start", + request_id=request_id, + bucket=cfg["bucket"], + prefix=cfg["prefix"], + page_size=cfg["page_size"], + concurrency=cfg["concurrency"], + ) t0 = time.monotonic() pages = 0 @@ -85,7 +107,8 @@ async def _run(event: dict, request_id: str): try: paginator = s3.get_paginator("list_objects_v2") async for page in paginator.paginate( - Bucket=cfg["bucket"], Prefix=cfg["prefix"], + Bucket=cfg["bucket"], + Prefix=cfg["prefix"], PaginationConfig={"PageSize": cfg["page_size"]}, ): pages += 1 @@ -120,10 +143,17 @@ async def _run(event: dict, request_id: str): local += 1 except Exception as exc: errors += 1 - _log("presign_error", request_id=request_id, key=item, error=str(exc)) + _log( + "presign_error", + request_id=request_id, + key=item, + error=str(exc), + ) prod_task = asyncio.create_task(producer()) - counts = await asyncio.gather(*(consumer() for _ in range(cfg["concurrency"]))) + counts = await asyncio.gather( + *(consumer() for _ in range(cfg["concurrency"])) + ) await prod_task count = sum(counts) @@ -134,7 +164,9 @@ async def _run(event: dict, request_id: str): # the whole manifest in memory like `body = await f.read()` would. with open(manifest_path, "rb") as f: await s3.put_object( - Bucket=cfg["bucket"], Key=manifest_key, Body=f, + Bucket=cfg["bucket"], + Key=manifest_key, + Body=f, ContentType="application/x-ndjson", ) @@ -154,13 +186,24 @@ async def _run(event: dict, request_id: str): response_bytes = len(json.dumps(result)) duration_ms = (time.monotonic() - t0) * 1000 - _log("complete", request_id=request_id, count=count, errors=errors, - pages=pages, duration_ms=round(duration_ms, 2)) - _emit_emf({ - "PDFsProcessed": count, "S3ListPages": pages, - "PresignCount": count, "ManifestBytes": manifest_bytes, - "ResponseBytes": response_bytes, - }, Function="sign_pdfs") + _log( + "complete", + request_id=request_id, + count=count, + errors=errors, + pages=pages, + duration_ms=round(duration_ms, 2), + ) + _emit_emf( + { + "PDFsProcessed": count, + "S3ListPages": pages, + "PresignCount": count, + "ManifestBytes": manifest_bytes, + "ResponseBytes": response_bytes, + }, + Function="sign_pdfs", + ) return result diff --git a/template.yaml b/template.yaml index 88412aa..187a0cc 100644 --- a/template.yaml +++ b/template.yaml @@ -2,6 +2,17 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: eth-demo — Lambda + Step Functions demo +Parameters: + Prefix: + Type: String + Default: 2026/04/ + Description: >- + S3 key prefix the function scans for PDFs. Matches the seed script's + default. Override with --parameter-overrides Prefix=other/ to target a + different prefix without changing the template. Trailing slash required. + AllowedPattern: ".+/" + ConstraintDescription: "must end with '/' (e.g. 2026/04/)" + Globals: Function: Runtime: python3.13 @@ -34,11 +45,30 @@ Resources: Type: AWS::Serverless::Function Properties: FunctionName: eth-demo-sign-pdfs - CodeUri: functions/stub/ + CodeUri: functions/sign_pdfs/ Handler: handler.handler LoggingConfig: LogFormat: JSON LogGroup: !Ref SignPdfsLogGroup + Environment: + Variables: + BUCKET_NAME: !Ref ReportsBucket + PREFIX: !Ref Prefix + URL_EXPIRY_SECONDS: "900" + Policies: + - Statement: + - Sid: ListReportsBucket + Effect: Allow + Action: s3:ListBucket + Resource: !GetAtt ReportsBucket.Arn + - Sid: ReadReports + Effect: Allow + Action: s3:GetObject + Resource: !Sub "${ReportsBucket.Arn}/*" + - Sid: WriteManifests + Effect: Allow + Action: s3:PutObject + Resource: !Sub "${ReportsBucket.Arn}/manifests/*" Outputs: ReportsBucketName: