metadata pipeline via Step Functions

This commit is contained in:
2026-05-18 07:59:13 -03:00
parent d3008676e0
commit e297f97e18
6 changed files with 280 additions and 0 deletions

View File

@@ -0,0 +1,65 @@
"""Extract metadata from a single PDF in S3.
Called once per PDF by the SFN Map state — input is one S3 key, output is
that PDF's metadata. Designed for parallel invocation.
Input event:
{"key": "2026/04/document.pdf"} (bucket from BUCKET_NAME env var)
{"bucket": "...", "key": "..."} (full override)
Output:
{"key": "...", "pages": N, "title": "..."|null, "author": "..."|null,
"size_bytes": N}
A failed parse (corrupt PDF, unsupported encryption) returns a row with
pages=0 and an "error" field — the Map state continues with the rest;
the bad PDF shows up later in the aggregate as a parse error count.
"""
import io
import os
import boto3
from pypdf import PdfReader
from pypdf.errors import PdfReadError
_s3 = boto3.client("s3", endpoint_url=os.environ.get("S3_ENDPOINT_URL") or None)
def _clean(value) -> str | None:
if value is None:
return None
s = str(value).strip()
return s or None
def handler(event, context):
bucket = event.get("bucket") or os.environ["BUCKET_NAME"]
key = event["key"]
obj = _s3.get_object(Bucket=bucket, Key=key)
body = obj["Body"].read()
size_bytes = len(body)
try:
reader = PdfReader(io.BytesIO(body))
pages = len(reader.pages)
info = reader.metadata or {}
title = _clean(info.get("/Title"))
author = _clean(info.get("/Author"))
except (PdfReadError, Exception) as exc:
return {
"key": key,
"pages": 0,
"title": None,
"author": None,
"size_bytes": size_bytes,
"error": f"{type(exc).__name__}: {exc}",
}
return {
"key": key,
"pages": pages,
"title": title,
"author": author,
"size_bytes": size_bytes,
}

View File

@@ -0,0 +1,3 @@
# pypdf is pure Python — small footprint, no native wheels needed for arm64.
# boto3 is already in the Lambda Python runtime, no need to bundle.
pypdf>=5.0

View File

@@ -0,0 +1,32 @@
"""List every .pdf key under the configured bucket+prefix.
Output:
{"keys": ["2026/04/a.pdf", ...], "count": N, "pages": N}
Used as the first state in the metadata-index pipeline. The "keys" array
feeds an SFN Map state that runs ExtractMetadata in parallel per key.
"""
import os
import boto3
_s3 = boto3.client("s3", endpoint_url=os.environ.get("S3_ENDPOINT_URL") or None)
def handler(event, context):
bucket = event.get("bucket") or os.environ["BUCKET_NAME"]
prefix = event.get("prefix") or os.environ["PREFIX"]
keys = []
pages = 0
paginator = _s3.get_paginator("list_objects_v2")
for page in paginator.paginate(
Bucket=bucket, Prefix=prefix, PaginationConfig={"PageSize": 1000}
):
pages += 1
for obj in page.get("Contents", []) or []:
key = obj["Key"]
if key.lower().endswith(".pdf"):
keys.append(key)
return {"keys": keys, "count": len(keys), "pages": pages}

View File

@@ -0,0 +1 @@
# boto3 is provided by the Lambda Python runtime — no deps to bundle.

View File

@@ -0,0 +1,71 @@
{
"Comment": "PDF metadata index pipeline — list keys, then per-key extract + DDB write in parallel.",
"StartAt": "ListPdfs",
"States": {
"ListPdfs": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${ListPdfsFunctionArn}",
"Payload": {}
},
"ResultSelector": {
"keys.$": "$.Payload.keys",
"count.$": "$.Payload.count"
},
"ResultPath": "$.list",
"Next": "ExtractAndIndex"
},
"ExtractAndIndex": {
"Type": "Map",
"ItemsPath": "$.list.keys",
"MaxConcurrency": 10,
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "ExtractMetadata",
"States": {
"ExtractMetadata": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${ExtractMetadataFunctionArn}",
"Payload": {
"key.$": "$"
}
},
"ResultSelector": {
"key.$": "$.Payload.key",
"pages.$": "$.Payload.pages",
"size_bytes.$": "$.Payload.size_bytes"
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2.0
}
],
"Next": "WriteToIndex"
},
"WriteToIndex": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "${PdfIndexTableName}",
"Item": {
"key": { "S.$": "$.key" },
"pages": { "N.$": "States.Format('{}', $.pages)" },
"size_bytes": { "N.$": "States.Format('{}', $.size_bytes)" }
}
},
"End": true
}
}
},
"End": true
}
}
}

View File

@@ -56,6 +56,114 @@ Resources:
AttributeName: ttl
Enabled: true
PdfIndexTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub ${AWS::StackName}-pdf-index
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: key
AttributeType: S
KeySchema:
- AttributeName: key
KeyType: HASH
ExtractMetadataLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/eth-demo-extract-metadata
RetentionInDays: 7
ExtractMetadataFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: eth-demo-extract-metadata
CodeUri: functions/extract_metadata/
Handler: handler.handler
Timeout: 60
MemorySize: 512
LoggingConfig:
LogFormat: JSON
LogGroup: !Ref ExtractMetadataLogGroup
Environment:
Variables:
BUCKET_NAME: !Ref ReportsBucket
Policies:
- Statement:
- Sid: ReadPdf
Effect: Allow
Action: s3:GetObject
Resource: !Sub "${ReportsBucket.Arn}/*"
ListPdfsLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/eth-demo-list-pdfs
RetentionInDays: 7
ListPdfsFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: eth-demo-list-pdfs
CodeUri: functions/list_pdfs/
Handler: handler.handler
LoggingConfig:
LogFormat: JSON
LogGroup: !Ref ListPdfsLogGroup
Environment:
Variables:
BUCKET_NAME: !Ref ReportsBucket
PREFIX: !Ref Prefix
Policies:
- Statement:
- Sid: ListReportsBucket
Effect: Allow
Action: s3:ListBucket
Resource: !GetAtt ReportsBucket.Arn
PdfIndexStateMachineLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/vendedlogs/states/${AWS::StackName}-pdf-index
RetentionInDays: 7
PdfIndexStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
Name: !Sub ${AWS::StackName}-pdf-index
DefinitionUri: statemachines/pdf-index/definition.asl.json
DefinitionSubstitutions:
ListPdfsFunctionArn: !GetAtt ListPdfsFunction.Arn
ExtractMetadataFunctionArn: !GetAtt ExtractMetadataFunction.Arn
PdfIndexTableName: !Ref PdfIndexTable
Logging:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt PdfIndexStateMachineLogGroup.Arn
IncludeExecutionData: true
Level: ALL
Policies:
- LambdaInvokePolicy:
FunctionName: !Ref ListPdfsFunction
- LambdaInvokePolicy:
FunctionName: !Ref ExtractMetadataFunction
- DynamoDBWritePolicy:
TableName: !Ref PdfIndexTable
# SFN logging requires log-delivery API perms at account scope; SAM
# doesn't auto-add these even with Logging configured.
- Statement:
- Effect: Allow
Action:
- logs:CreateLogDelivery
- logs:GetLogDelivery
- logs:UpdateLogDelivery
- logs:DeleteLogDelivery
- logs:ListLogDeliveries
- logs:PutResourcePolicy
- logs:DescribeResourcePolicies
- logs:DescribeLogGroups
Resource: "*"
SignPdfsFunction:
Type: AWS::Serverless::Function
Properties: