Asynchronous System Integration as Scale

Content Covered

  • Solution and simplified architecture diagram
  • AWS CDK Script (TypeScript)
  • Lambda Function (Python)

Solution

This solution demonstrates how to build an asynchronous webhook processing system using AWS services. The system leverages AWS API Gateway to receive webhook requests, AWS Lambda for processing and validating requests, Amazon S3 for storage, Amazon DynamoDB for tracking, and AWS Step Functions for orchestrating workflows.

Use Case Description

This solution is ideal for scenarios where webhook requests need to be processed asynchronously due to long-running operations or the need for reliable, scalable handling of high-volume incoming requests. Examples include compute intensive ML/simulations, processing payments, handling third-party API callbacks, and managing data ingestion pipelines.

Industry domains

This solution fits for all industry domains where long-running asynchronous jobs are required. E.g. healthcare, biochemical, energy, finance.

Simplified architecture diagram

Architecture Diagram

The architecture consists of:

  1. API Gateway: Receives incoming webhook requests.
  2. AWS Lambda: Processes and validates the requests.
  3. Amazon S3: Stores the validated request data.
  4. Amazon DynamoDB: Stores metadata and S3 object keys.
  5. AWS Step Functions: Manages the workflow triggered by the webhook request.

AWS CDK Script (TypeScript)

import * as cdk from 'aws-cdk-lib';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
import * as stepfunctions_tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class WebhookStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const bucket = new s3.Bucket(this, 'WebhookBucket');

    const table = new dynamodb.Table(this, 'WebhookTable', {
      partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
    });

    const webhookFunction = new lambda.Function(this, 'WebhookFunction', {
      runtime: lambda.Runtime.PYTHON_3_8,
      handler: 'index.handler',
      code: lambda.Code.fromAsset('lambda'),
      environment: {
        BUCKET_NAME: bucket.bucketName,
        TABLE_NAME: table.tableName,
      },
    });

    bucket.grantPut(webhookFunction);
    table.grantReadWriteData(webhookFunction);

    const api = new apigateway.RestApi(this, 'WebhookApi', {
      restApiName: 'Webhook Service',
    });

    const webhookIntegration = new apigateway.LambdaIntegration(webhookFunction);
    api.root.addMethod('POST', webhookIntegration);

    const stepFunctionDefinition = new stepfunctions.Pass(this, 'StartState');

    const stepFunction = new stepfunctions.StateMachine(this, 'WebhookStateMachine', {
      definition: stepFunctionDefinition,
    });
  }
}

Lambda Function (Python)

import json
import boto3
import os
from uuid import uuid4

s3_client = boto3.client('s3')
dynamodb_client = boto3.client('dynamodb')

def handler(event, context):
    bucket_name = os.environ['BUCKET_NAME']
    table_name = os.environ['TABLE_NAME']

    # Validate and process the request
    body = json.loads(event['body'])
    if 'data' not in body:
        return {
            'statusCode': 400,
            'body': json.dumps({'message': 'Invalid request'})
        }

    # Store data in S3
    object_key = str(uuid4())
    s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=json.dumps(body['data']))

    # Store metadata in DynamoDB
    dynamodb_client.put_item(
        TableName=table_name,
        Item={
            'id': {'S': object_key},
            'data': {'S': json.dumps(body['data'])}
        }
    )

    # Pass request to Step Functions workflow (simplified for demo)
    # In real scenario, you would trigger the workflow with more details

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Request processed successfully'})
    }

Copyright © 2024. All rights reserved.