Dynamodb Stream with Lambda, SQS and Lambda

Integrating Amazon DynamoDB with AWS Lambda and Amazon Simple Queue Service (SQS) provides a robust solution for processing data changes in real-time. If you use dynamodb, you broadly use a Stream to capture the changes in the table. But in real world, you can use only two concurrency limit for lambda function. This architecture is particularly useful when handling high volumes of data changes and ensuring that downstream processing can scale effectively. The use of AWS Lambda functions allows for event-driven execution, while SQS offers a reliable message queuing service to decouple components and manage processing load. This setup is further enhanced by deploying on the ARM platform for cost and efficiency gains, and utilizing the AWS Lambda PowerTools library for improved observability and operational excellence.
Typical customers who often use this case are those who have a lot of data changes in their dynamodb table and want to process them in real-time. This is a common use case for customers who want to process data changes in real-time, such as updating a search index, sending notifications, or triggering other workflows. For example, these are healthcare companies, financial services, and e-commerce companies.

Content Covered

  • Creating a DynamoDB table
  • Creating a Lambda function
  • Creating an SQS queue
  • Creating a Lambda function to process the SQS queue
  • Example Python code for Lambda

Demo Architecture stack

This small stack have a DynamoDB table, a Lambda function that processes the DynamoDB stream, an SQS queue, and a Lambda function that processes the SQS queue.

Dynamodb Stream with Lambda, SQS and Lambda

AWS CDK Script (TypeScript)

First, we need to create a new CDK stack. The following example demonstrates how to create a new CDK stack with a DynamoDB table, an SQS queue, and two Lambda functions. The first Lambda function processes the DynamoDB stream and writes messages to the SQS queue, while the second Lambda function processes the SQS queue.

import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import * as sources from '@aws-cdk/aws-lambda-event-sources';

export class MyStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create the DynamoDB table
    const table = new dynamodb.Table(this, 'MyTable', {
      partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
      stream: dynamodb.StreamViewType.PYTHON_3_12,
    });

    // Create the SQS queue
    const queue = new sqs.Queue(this, 'MyQueue');

    // Create the first Lambda function (DynamoDB Stream processor)
    const dynamoLambda = new lambda.Function(this, 'DynamoLambda', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'app.handler',
      code: lambda.Code.fromAsset('lambda'),
      memorySize: 256,
      environment: {
        QUEUE_URL: queue.queueUrl,
      },
    });

    // Grant the Lambda function permissions to write to the SQS queue
    queue.grantSendMessages(dynamoLambda);

    // Set the DynamoDB stream as an event source
    dynamoLambda.addEventSource(new sources.DynamoEventSource(table, {
      startingPosition: lambda.StartingPosition.TRIM_HORIZON,
    }));

    // Create the second Lambda function (SQS processor)
    const sqsLambda = new lambda.Function(this, 'SqsLambda', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'app.handler',
      code: lambda.Code.fromAsset('lambda'),
      memorySize: 256,
    });

    // Set the SQS queue as an event source
    sqsLambda.addEventSource(new sources.SqsEventSource(queue));
  }
}

Lambda Function (Python)

AWS Lambda PowerTools is a suite of utilities for AWS Lambda functions that makes it easier to implement best practices for logging, monitoring, and tracing in serverless applications. It's designed to help developers with various aspects of developing, deploying, and maintaining Lambda-based applications, simplifying tasks such as structured logging, metrics collection, and tracing across microservices. This section outlines how AWS Lambda PowerTools can benefit your serverless applications, particularly in the context of a DynamoDB-Lambda-SQS-Lambda architecture.

app.py

import os
import boto3
import json

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response  # type: ignore
from aws_lambda_powertools.utilities.typing import LambdaContext  # type: ignore
from aws_lambda_powertools.logging.logger import Logger  # type: ignore

from lib.queue import MediaQueue

ENV = os.environ.get("ENV", "dev")
ssm = boto3.client("ssm")

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)

logger = Logger(service="spaces_data_table_stream", level="DEBUG")

media_queue = MediaQueue(logger, ssm.get_parameter(
    Name=f"/{ENV}/media-table-process-queue/url")["Parameter"]["Value"])


def record_handler(record: dict):
    return media_queue.send_stream_data(record)


@logger.inject_lambda_context(clear_state=True)
def handler(event, context: LambdaContext):
    print(json.dumps(event))
    return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

/lib/queue.py

import boto3
import traceback
import uuid
import json
import time
from botocore.exceptions import ClientError

sqs = boto3.resource("sqs")

class MediaQueue:
    def __init__(self, logger, media_queue_name: str) -> None:
        self.logger = logger
        self.media_queue = sqs.Queue(media_queue_name)

    def send_message(self, body: dict) -> tuple[int, str]:
        try:
            self.media_queue.send_message(
                MessageBody=json.dumps(body),
                MessageGroupId=f"media_{str(uuid.uuid4())}",
                MessageDeduplicationId=str(uuid.uuid4())
            )
            return 200, f"Message sent to queue"
        
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                time.sleep(1)
                self.send_message(body)
        
        except Exception as e:
            traceback.print_exc()
            self.logger.error(e)
            return 500, f"{e.__dict__} {e.with_traceback(None)}"

    def send_stream_data(self, record: dict) -> tuple[int, str]:
        message = {
            "action": record.get("eventName", "unknown"),
            "body": record.get("dynamodb", {})
        }
        return self.send_message(message)

Copyright © 2024. All rights reserved.