Dynamodb Stream with Lambda, SQS and Lambda
Integrating Amazon DynamoDB with AWS Lambda and Amazon Simple Queue Service (SQS) provides a robust solution for processing data changes in real-time. If you use dynamodb, you broadly use a Stream to capture the changes in the table. But in real world, you can use only two concurrency limit for lambda function. This architecture is particularly useful when handling high volumes of data changes and ensuring that downstream processing can scale effectively. The use of AWS Lambda functions allows for event-driven execution, while SQS offers a reliable message queuing service to decouple components and manage processing load. This setup is further enhanced by deploying on the ARM platform for cost and efficiency gains, and utilizing the AWS Lambda PowerTools library for improved observability and operational excellence.
Typical customers who often use this case are those who have a lot of data changes in their dynamodb table and want to process them in real-time. This is a common use case for customers who want to process data changes in real-time, such as updating a search index, sending notifications, or triggering other workflows. For example, these are healthcare companies, financial services, and e-commerce companies.
Content Covered
- Creating a DynamoDB table
- Creating a Lambda function
- Creating an SQS queue
- Creating a Lambda function to process the SQS queue
- Example Python code for Lambda
Demo Architecture stack
This small stack have a DynamoDB table, a Lambda function that processes the DynamoDB stream, an SQS queue, and a Lambda function that processes the SQS queue.
AWS CDK Script (TypeScript)
First, we need to create a new CDK stack. The following example demonstrates how to create a new CDK stack with a DynamoDB table, an SQS queue, and two Lambda functions. The first Lambda function processes the DynamoDB stream and writes messages to the SQS queue, while the second Lambda function processes the SQS queue.
import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import * as sources from '@aws-cdk/aws-lambda-event-sources';
export class MyStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Create the DynamoDB table
const table = new dynamodb.Table(this, 'MyTable', {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
stream: dynamodb.StreamViewType.PYTHON_3_12,
});
// Create the SQS queue
const queue = new sqs.Queue(this, 'MyQueue');
// Create the first Lambda function (DynamoDB Stream processor)
const dynamoLambda = new lambda.Function(this, 'DynamoLambda', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'app.handler',
code: lambda.Code.fromAsset('lambda'),
memorySize: 256,
environment: {
QUEUE_URL: queue.queueUrl,
},
});
// Grant the Lambda function permissions to write to the SQS queue
queue.grantSendMessages(dynamoLambda);
// Set the DynamoDB stream as an event source
dynamoLambda.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
}));
// Create the second Lambda function (SQS processor)
const sqsLambda = new lambda.Function(this, 'SqsLambda', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'app.handler',
code: lambda.Code.fromAsset('lambda'),
memorySize: 256,
});
// Set the SQS queue as an event source
sqsLambda.addEventSource(new sources.SqsEventSource(queue));
}
}
Lambda Function (Python)
AWS Lambda PowerTools is a suite of utilities for AWS Lambda functions that makes it easier to implement best practices for logging, monitoring, and tracing in serverless applications. It's designed to help developers with various aspects of developing, deploying, and maintaining Lambda-based applications, simplifying tasks such as structured logging, metrics collection, and tracing across microservices. This section outlines how AWS Lambda PowerTools can benefit your serverless applications, particularly in the context of a DynamoDB-Lambda-SQS-Lambda architecture.
app.py
import os
import boto3
import json
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response # type: ignore
from aws_lambda_powertools.utilities.typing import LambdaContext # type: ignore
from aws_lambda_powertools.logging.logger import Logger # type: ignore
from lib.queue import MediaQueue
ENV = os.environ.get("ENV", "dev")
ssm = boto3.client("ssm")
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger(service="spaces_data_table_stream", level="DEBUG")
media_queue = MediaQueue(logger, ssm.get_parameter(
Name=f"/{ENV}/media-table-process-queue/url")["Parameter"]["Value"])
def record_handler(record: dict):
return media_queue.send_stream_data(record)
@logger.inject_lambda_context(clear_state=True)
def handler(event, context: LambdaContext):
print(json.dumps(event))
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
/lib/queue.py
import boto3
import traceback
import uuid
import json
import time
from botocore.exceptions import ClientError
sqs = boto3.resource("sqs")
class MediaQueue:
def __init__(self, logger, media_queue_name: str) -> None:
self.logger = logger
self.media_queue = sqs.Queue(media_queue_name)
def send_message(self, body: dict) -> tuple[int, str]:
try:
self.media_queue.send_message(
MessageBody=json.dumps(body),
MessageGroupId=f"media_{str(uuid.uuid4())}",
MessageDeduplicationId=str(uuid.uuid4())
)
return 200, f"Message sent to queue"
except ClientError as e:
if e.response['Error']['Code'] == 'ThrottlingException':
time.sleep(1)
self.send_message(body)
except Exception as e:
traceback.print_exc()
self.logger.error(e)
return 500, f"{e.__dict__} {e.with_traceback(None)}"
def send_stream_data(self, record: dict) -> tuple[int, str]:
message = {
"action": record.get("eventName", "unknown"),
"body": record.get("dynamodb", {})
}
return self.send_message(message)