diff --git a/functions/sign_pdfs/handler.py b/functions/sign_pdfs/handler.py index 306c455..cdeca61 100644 --- a/functions/sign_pdfs/handler.py +++ b/functions/sign_pdfs/handler.py @@ -24,6 +24,7 @@ Production refinements vs sign_pdfs_v1 (see docs/lambdas-md/lambda-20-sign-pdfs. """ import asyncio +import hashlib import json import os import time @@ -35,6 +36,14 @@ import aiofiles _DONE = object() +def _dedup_key(cfg: dict) -> str: + # Identity of a logical job: same (bucket, prefix) = same job. Hashing keeps + # the DDB key bounded regardless of prefix length and avoids leaking the + # bucket/prefix into the dedup key in cleartext. + raw = f"{cfg['bucket']}#{cfg['prefix']}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + def _cfg(event: dict) -> dict: # bucket and prefix are required — fail loud if neither event nor env supplies # them. The function deliberately has no fallback default (no "2026/04/" baked @@ -95,9 +104,41 @@ async def _run(event: dict, request_id: str): ) t0 = time.monotonic() + session = aioboto3.Session() + dedup_table = os.environ.get("DEDUP_TABLE") + dedup_key = _dedup_key(cfg) + + # Idempotency check — return the cached result if (bucket, prefix) was + # processed within the TTL window. Regenerate the manifest URL so the + # cached response is always usable, even after the original URL expired. + if dedup_table: + async with session.resource("dynamodb") as ddb: + table = await ddb.Table(dedup_table) + try: + resp = await table.get_item(Key={"id": dedup_key}) + except Exception as exc: + _log("dedup_lookup_error", request_id=request_id, error=str(exc)) + resp = {} + if "Item" in resp: + cached = json.loads(resp["Item"]["result"]) + async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3: + cached["manifest_url"] = await s3.generate_presigned_url( + "get_object", + Params={"Bucket": cfg["bucket"], "Key": cached["manifest_key"]}, + ExpiresIn=cfg["expiry"], + ) + cached["idempotent"] = True + _log( + "cache_hit", + request_id=request_id, + dedup_key=dedup_key, + count=cached["count"], + duration_ms=round((time.monotonic() - t0) * 1000, 2), + ) + return cached + pages = 0 errors = 0 - session = aioboto3.Session() async with session.client("s3", endpoint_url=cfg["endpoint"]) as s3: queue: asyncio.Queue = asyncio.Queue(maxsize=cfg["queue_max"]) manifest_path = f"/tmp/{uuid.uuid4()}.jsonl" @@ -205,6 +246,22 @@ async def _run(event: dict, request_id: str): Function="sign_pdfs", ) + if dedup_table: + async with session.resource("dynamodb") as ddb: + table = await ddb.Table(dedup_table) + try: + await table.put_item( + Item={ + "id": dedup_key, + "result": json.dumps(result), + "ttl": int(time.time()) + 86400, # 24h + } + ) + except Exception as exc: + # cache write failure is non-critical — the actual work + # succeeded. Log it so we know if dedup is broken. + _log("dedup_write_error", request_id=request_id, error=str(exc)) + return result diff --git a/template.yaml b/template.yaml index 187a0cc..a50615f 100644 --- a/template.yaml +++ b/template.yaml @@ -41,6 +41,21 @@ Resources: LogGroupName: /aws/lambda/eth-demo-sign-pdfs RetentionInDays: 7 + DedupTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub ${AWS::StackName}-sign-pdfs-dedup + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: id + AttributeType: S + KeySchema: + - AttributeName: id + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + SignPdfsFunction: Type: AWS::Serverless::Function Properties: @@ -55,6 +70,7 @@ Resources: BUCKET_NAME: !Ref ReportsBucket PREFIX: !Ref Prefix URL_EXPIRY_SECONDS: "900" + DEDUP_TABLE: !Ref DedupTable Policies: - Statement: - Sid: ListReportsBucket @@ -69,6 +85,12 @@ Resources: Effect: Allow Action: s3:PutObject Resource: !Sub "${ReportsBucket.Arn}/manifests/*" + - Sid: DedupTableAccess + Effect: Allow + Action: + - dynamodb:GetItem + - dynamodb:PutItem + Resource: !GetAtt DedupTable.Arn Outputs: ReportsBucketName: