AWS CDKでSNS-SQS-Lambdaファンアウトを実装してみた。

2022.02.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

この記事では、AWS CDKでSNS、 SQS、Lambdaを使用してファンアウトアーキテクチャを実装してみました。ファンアウトアーキテクチャを使用すると、SNSトピックから複数のエンドポイントにメッセージを送信できます。Kinesis Data Firehose 配信ストリーム、Amazon SQS キュー、HTTP (S) エンドポイント、Lambda 関数をエンドポイントとして使用できます。ここでは、SNSトピックから複数のSQSキューにメッセージを送信してみました。SQSキューがメッセージを受信すると、Lambda関数がトリガーされてメッセージが処理されます。

 

 

やってみた

 

CDKアプリの作成

  • 次のコマンドを使用してCDKをインストールしておきます。
npm install aws-cdk-lib

 

  • 新しいディレクトリを作成しておきます。
  • CDKは、プロジェクトディレクトリの名前に基づいてソースファイルとクラスに名前を付けます。
#create new directory
mkdir sns-fanout
cd sns-fanout

 

  • cdk initコマンドを使用してアプリを初期化しておきます。
cdk init --language typescript

 

AWS サービスの作成

  • 新しいファイル [lib/index.ts] を作成して、作成する必要のあるAWSサービスを定義しておきます。
  • ファイルに次のAWSサービスを定義しておきます。
    • SNS : メッセージを送信するためのSNSトピック。
    • SQS : 2つのSQSキュー。
      • キューをSNSトピックにサブスクライブします。
    • Lambda 関数 : 2つのLambda関数。
      • LambdaのイベントソースとしてSQSキューを追加します。

 

import { Construct } from 'constructs';
import { SqsSubscription } from 'aws-cdk-lib/aws-sns-subscriptions';
import { Duration, StackProps} from 'aws-cdk-lib';
import { Runtime, Function, AssetCode } from 'aws-cdk-lib/aws-lambda';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

export class snsFanOutStack extends Construct {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id);

    //Create SNS Topic
    const topic = new Topic(this, 'topic', {
        topicName : 'fanOutTopic'
    }
    );

    //Create SQS queue
    const queue1 = new Queue(this, 'fanOutQueue',{
        queueName : 'fanOutQueue1'
    }
    );

    const queue2 = new Queue(this, 'fan-out-Queue',{
      queueName : 'fanOutQueue2'
    }
    );

    //Subscribe queue to SNS Topic
    topic.addSubscription(new SqsSubscription(queue1));
    topic.addSubscription(new SqsSubscription(queue2));
    
    //Create Lambda Functions
    const lambdaFunction1 = new Function(this, "fanOutFunction", {
      code: new AssetCode("resources"),
      handler: "lambdaHandler.handler",
      runtime: Runtime.PYTHON_3_9,
      functionName: "fanOutFunction1",
      timeout: Duration.seconds(30)
    });

    const lambdaFunction2 = new Function(this, "fan-out-Function", {
      code: new AssetCode("resources"),
      handler: "lambda-handler.handler",
      runtime: Runtime.PYTHON_3_9,
      functionName: "fanOutFunction2",
      timeout: Duration.seconds(30)
    });
    
    //Add queue as Event Source 
    lambdaFunction1.addEventSource(new SqsEventSource(queue1));
    lambdaFunction2.addEventSource(new SqsEventSource(queue2));
  }
}

 

Lambda関数の作成

  • プロジェクトのメインディレクトリにresourcesディレクトリを作成しておきます。
mkdir resources

 

  • resourcesディレクトリに2つのファイルを作成しておきます。
    • [resources/lambdaHandler.py] - Queue1のために作成します。
    • [resources/lambda-handler.py] - Queue2のために作成します。
  • 両方のファイルに以下のコードを追加します。

 

import json

def handler(event, context):
    data = event.get("Records")[0].get("body")
    message = json.loads(data)
    print(json.dumps(message['Message']))

 

アプリにサービスを追加する

  • /lib/sns-fanout-stack.ts ファイルに次のコードを追加しておきます。
#Import the Index file created in the previous step
import * as sns from '../lib/index';
new sns.snsFanOutStack(this, 'sns-fan-out');

 

CDK Deploy

  • Deploy する前に、環境をブートストラップする必要があります。
  • 次のコマンドを実行して、AWS環境をブートストラップしておきます。
cdk bootstrap

 

  • CDKを展開しておきます。
cdk deploy

 

  • コンソールでは、サービスが作成されたことを見ることができます。

SNS Topic

 

SQS Queue

 

Lambda Function

 

ファンアウトをテストする

  • SNSコンソールでメッセージを公開しておきます。

 

  • Lambda関数のCloudWatchログでLambdaによって処理されたメッセージを見ることができます。

Function1のログ

 

Function2のログ

 

まとめ

AWS CDKでSNS、 SQS、Lambdaを使用してファンアウトアーキテクチャを実装してみました。

Reference :

SQS Event Source

SQS Queue using CDK

SNS Topic using CDK

SQS Subscription to SNS topic