From e297f97e180004dd1c1bf0313bdf0c38ce9921a4 Mon Sep 17 00:00:00 2001 From: buenosairesam Date: Mon, 18 May 2026 07:59:13 -0300 Subject: [PATCH] metadata pipeline via Step Functions --- functions/extract_metadata/handler.py | 65 ++++++++++++ functions/extract_metadata/requirements.txt | 3 + functions/list_pdfs/handler.py | 32 ++++++ functions/list_pdfs/requirements.txt | 1 + statemachines/pdf-index/definition.asl.json | 71 +++++++++++++ template.yaml | 108 ++++++++++++++++++++ 6 files changed, 280 insertions(+) create mode 100644 functions/extract_metadata/handler.py create mode 100644 functions/extract_metadata/requirements.txt create mode 100644 functions/list_pdfs/handler.py create mode 100644 functions/list_pdfs/requirements.txt create mode 100644 statemachines/pdf-index/definition.asl.json diff --git a/functions/extract_metadata/handler.py b/functions/extract_metadata/handler.py new file mode 100644 index 0000000..f8a6857 --- /dev/null +++ b/functions/extract_metadata/handler.py @@ -0,0 +1,65 @@ +"""Extract metadata from a single PDF in S3. + +Called once per PDF by the SFN Map state — input is one S3 key, output is +that PDF's metadata. Designed for parallel invocation. + +Input event: + {"key": "2026/04/document.pdf"} (bucket from BUCKET_NAME env var) + {"bucket": "...", "key": "..."} (full override) + +Output: + {"key": "...", "pages": N, "title": "..."|null, "author": "..."|null, + "size_bytes": N} + +A failed parse (corrupt PDF, unsupported encryption) returns a row with +pages=0 and an "error" field — the Map state continues with the rest; +the bad PDF shows up later in the aggregate as a parse error count. +""" +import io +import os + +import boto3 +from pypdf import PdfReader +from pypdf.errors import PdfReadError + +_s3 = boto3.client("s3", endpoint_url=os.environ.get("S3_ENDPOINT_URL") or None) + + +def _clean(value) -> str | None: + if value is None: + return None + s = str(value).strip() + return s or None + + +def handler(event, context): + bucket = event.get("bucket") or os.environ["BUCKET_NAME"] + key = event["key"] + + obj = _s3.get_object(Bucket=bucket, Key=key) + body = obj["Body"].read() + size_bytes = len(body) + + try: + reader = PdfReader(io.BytesIO(body)) + pages = len(reader.pages) + info = reader.metadata or {} + title = _clean(info.get("/Title")) + author = _clean(info.get("/Author")) + except (PdfReadError, Exception) as exc: + return { + "key": key, + "pages": 0, + "title": None, + "author": None, + "size_bytes": size_bytes, + "error": f"{type(exc).__name__}: {exc}", + } + + return { + "key": key, + "pages": pages, + "title": title, + "author": author, + "size_bytes": size_bytes, + } diff --git a/functions/extract_metadata/requirements.txt b/functions/extract_metadata/requirements.txt new file mode 100644 index 0000000..a99327b --- /dev/null +++ b/functions/extract_metadata/requirements.txt @@ -0,0 +1,3 @@ +# pypdf is pure Python — small footprint, no native wheels needed for arm64. +# boto3 is already in the Lambda Python runtime, no need to bundle. +pypdf>=5.0 diff --git a/functions/list_pdfs/handler.py b/functions/list_pdfs/handler.py new file mode 100644 index 0000000..e08f6da --- /dev/null +++ b/functions/list_pdfs/handler.py @@ -0,0 +1,32 @@ +"""List every .pdf key under the configured bucket+prefix. + +Output: + {"keys": ["2026/04/a.pdf", ...], "count": N, "pages": N} + +Used as the first state in the metadata-index pipeline. The "keys" array +feeds an SFN Map state that runs ExtractMetadata in parallel per key. +""" +import os + +import boto3 + +_s3 = boto3.client("s3", endpoint_url=os.environ.get("S3_ENDPOINT_URL") or None) + + +def handler(event, context): + bucket = event.get("bucket") or os.environ["BUCKET_NAME"] + prefix = event.get("prefix") or os.environ["PREFIX"] + + keys = [] + pages = 0 + paginator = _s3.get_paginator("list_objects_v2") + for page in paginator.paginate( + Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000} + ): + pages += 1 + for obj in page.get("Contents", []) or []: + key = obj["Key"] + if key.lower().endswith(".pdf"): + keys.append(key) + + return {"keys": keys, "count": len(keys), "pages": pages} diff --git a/functions/list_pdfs/requirements.txt b/functions/list_pdfs/requirements.txt new file mode 100644 index 0000000..fc3045a --- /dev/null +++ b/functions/list_pdfs/requirements.txt @@ -0,0 +1 @@ +# boto3 is provided by the Lambda Python runtime — no deps to bundle. diff --git a/statemachines/pdf-index/definition.asl.json b/statemachines/pdf-index/definition.asl.json new file mode 100644 index 0000000..1109fe3 --- /dev/null +++ b/statemachines/pdf-index/definition.asl.json @@ -0,0 +1,71 @@ +{ + "Comment": "PDF metadata index pipeline — list keys, then per-key extract + DDB write in parallel.", + "StartAt": "ListPdfs", + "States": { + "ListPdfs": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${ListPdfsFunctionArn}", + "Payload": {} + }, + "ResultSelector": { + "keys.$": "$.Payload.keys", + "count.$": "$.Payload.count" + }, + "ResultPath": "$.list", + "Next": "ExtractAndIndex" + }, + "ExtractAndIndex": { + "Type": "Map", + "ItemsPath": "$.list.keys", + "MaxConcurrency": 10, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "ExtractMetadata", + "States": { + "ExtractMetadata": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${ExtractMetadataFunctionArn}", + "Payload": { + "key.$": "$" + } + }, + "ResultSelector": { + "key.$": "$.Payload.key", + "pages.$": "$.Payload.pages", + "size_bytes.$": "$.Payload.size_bytes" + }, + "Retry": [ + { + "ErrorEquals": ["States.ALL"], + "IntervalSeconds": 1, + "MaxAttempts": 2, + "BackoffRate": 2.0 + } + ], + "Next": "WriteToIndex" + }, + "WriteToIndex": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Parameters": { + "TableName": "${PdfIndexTableName}", + "Item": { + "key": { "S.$": "$.key" }, + "pages": { "N.$": "States.Format('{}', $.pages)" }, + "size_bytes": { "N.$": "States.Format('{}', $.size_bytes)" } + } + }, + "End": true + } + } + }, + "End": true + } + } +} diff --git a/template.yaml b/template.yaml index a50615f..7368806 100644 --- a/template.yaml +++ b/template.yaml @@ -56,6 +56,114 @@ Resources: AttributeName: ttl Enabled: true + PdfIndexTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub ${AWS::StackName}-pdf-index + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: key + AttributeType: S + KeySchema: + - AttributeName: key + KeyType: HASH + + ExtractMetadataLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: /aws/lambda/eth-demo-extract-metadata + RetentionInDays: 7 + + ExtractMetadataFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: eth-demo-extract-metadata + CodeUri: functions/extract_metadata/ + Handler: handler.handler + Timeout: 60 + MemorySize: 512 + LoggingConfig: + LogFormat: JSON + LogGroup: !Ref ExtractMetadataLogGroup + Environment: + Variables: + BUCKET_NAME: !Ref ReportsBucket + Policies: + - Statement: + - Sid: ReadPdf + Effect: Allow + Action: s3:GetObject + Resource: !Sub "${ReportsBucket.Arn}/*" + + ListPdfsLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: /aws/lambda/eth-demo-list-pdfs + RetentionInDays: 7 + + ListPdfsFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: eth-demo-list-pdfs + CodeUri: functions/list_pdfs/ + Handler: handler.handler + LoggingConfig: + LogFormat: JSON + LogGroup: !Ref ListPdfsLogGroup + Environment: + Variables: + BUCKET_NAME: !Ref ReportsBucket + PREFIX: !Ref Prefix + Policies: + - Statement: + - Sid: ListReportsBucket + Effect: Allow + Action: s3:ListBucket + Resource: !GetAtt ReportsBucket.Arn + + PdfIndexStateMachineLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub /aws/vendedlogs/states/${AWS::StackName}-pdf-index + RetentionInDays: 7 + + PdfIndexStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Name: !Sub ${AWS::StackName}-pdf-index + DefinitionUri: statemachines/pdf-index/definition.asl.json + DefinitionSubstitutions: + ListPdfsFunctionArn: !GetAtt ListPdfsFunction.Arn + ExtractMetadataFunctionArn: !GetAtt ExtractMetadataFunction.Arn + PdfIndexTableName: !Ref PdfIndexTable + Logging: + Destinations: + - CloudWatchLogsLogGroup: + LogGroupArn: !GetAtt PdfIndexStateMachineLogGroup.Arn + IncludeExecutionData: true + Level: ALL + Policies: + - LambdaInvokePolicy: + FunctionName: !Ref ListPdfsFunction + - LambdaInvokePolicy: + FunctionName: !Ref ExtractMetadataFunction + - DynamoDBWritePolicy: + TableName: !Ref PdfIndexTable + # SFN logging requires log-delivery API perms at account scope; SAM + # doesn't auto-add these even with Logging configured. + - Statement: + - Effect: Allow + Action: + - logs:CreateLogDelivery + - logs:GetLogDelivery + - logs:UpdateLogDelivery + - logs:DeleteLogDelivery + - logs:ListLogDeliveries + - logs:PutResourcePolicy + - logs:DescribeResourcePolicies + - logs:DescribeLogGroups + Resource: "*" + SignPdfsFunction: Type: AWS::Serverless::Function Properties: