CDKを使用して SQSでLambda Destinations を作成してみた。

2022.03.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

Lambda Destinationsで非同期呼び出しのレコードを別のサービスに送信することができます。処理に失敗したイベントと正常に処理されたイベントに別々の送信先を設定できます。

Lambda Destinations :

  • Amazon SQS
  • Amazon SNS
  • Lambda
  • EventBridge

この記事では、CDKを使用してSQSでLambda Destinations を作成してみました。ここでは、オブジェクトがS3バケットにアップロードされたときにLambdaをトリガーするS3イベントソースを作成します。次に、Lambda関数は成功イベントと失敗イベントをSQSキューに送信します。

 

 

やってみた

 

CDKアプリの作成

CDKをインストールする

  • 次のコマンドを使用してCDKをインストールしておきます。
npm install aws-cdk-lib

 

CDKアプリを作成する

  • 新しいディレクトリを作成しておきます。
  • CDKは、プロジェクトディレクトリの名前に基づいてソースファイルとクラスに名前を付けます。
#create new directory
mkdir lambda-destinations
cd lambda-destinations

 

  • cdk initコマンドを使用してアプリを初期化しておきます。
cdk init --language typescript

 

Lambda関数の作成

  • プロジェクトのメインディレクトリにresourcesディレクトリを作成しておきます。
mkdir resources

 

  • resourcesディレクトリに次のPythonファイルを作成しておきます。[resources/lambda-handler.py]

 

def handler(event, context):
   #To test successful events
   return {
          'statusCode': 200,
            'body': "SUCCESS"
    }
    #To test failed Events
    #raise Exception("Failed")

 

AWS サービスの作成

  • 新しいファイル [lib/index.ts] を作成して、作成する必要のあるAWSサービスを定義しておきます。
  • ファイルに次のAWSサービスを定義しておきます。
    • S3 Bucket : LambdaFunctionのイベントソースとしてS3バケットを作成します。
    • SQS : 2つのSQSキュー。
      • Success キュー : 成功イベントは成功キューに送信されます。
      • Failure キュー : 失敗したイベントは失敗キューに送信されます。
    • Lambda 関数 : DestinationとしてSQS、イベントソースとしてS3を使用するLambda関数。

 

import { Construct } from 'constructs';
import { Runtime, Function, AssetCode } from 'aws-cdk-lib/aws-lambda';
import { StackProps, RemovalPolicy} from 'aws-cdk-lib';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { SqsDestination} from 'aws-cdk-lib/aws-lambda-destinations';
import {  Bucket,EventType } from 'aws-cdk-lib/aws-s3';
import { S3EventSource } from 'aws-cdk-lib/aws-lambda-event-sources';

export class lambdaDestinationsStack extends Construct {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id);

    const bucket = new Bucket(this, 'bucket', {
      autoDeleteObjects: true,
      removalPolicy: RemovalPolicy.DESTROY
    });


    const eventSource = new S3EventSource(bucket, {
      events: [
        EventType.OBJECT_CREATED_PUT
      ]
    });

    const SuccessQueue = new Queue(this, 'SuccessQueue');
    const FailureQueue = new Queue(this, 'FailureQueue');

    const lambdaFunction = new Function(this, "lambda-sqs", {
        code: new AssetCode("resorces"),
        handler: "lambda-handler.handler",
        runtime: Runtime.PYTHON_3_9,
        functionName: "lambda-destinations-sqs",
        onSuccess: new SqsDestination(SuccessQueue),
        onFailure: new SqsDestination(FailureQueue),
      });

    lambdaFunction.addEventSource(eventSource);
  }
}

 

アプリにサービスを追加する

  • /lib/lambda-destinations-stack.ts ファイルに次のコードを追加しておきます。
#Import the Index file created in the previous step
import * as dest from '../lib/index';
new dest.lambdaDestinationsStack(this, 'lambda-destinations');

 

CDK Deploy

  • Deploy する前に、環境をブートストラップする必要があります。
  • 次のコマンドを実行して、AWS環境をブートストラップしておきます。
cdk bootstrap

 

  • CDKを展開しておきます。
cdk deploy

 

  • コンソールでは、サービスが作成されたことを見ることができます。

Lambda Function

 

SQS Queue

 

Lambda Destinations 

 

Destinationsをテストする

On success

  • 成功イベントをテストするには、Lambdaコードを変更しておきます。

 

def handler(event, context): 
   return { 
      'statusCode': 200, 
      'body': "SUCCESS" 
   }

 

  • S3バケットにファイルをアップロードしておきます。ファイルをS3バケットにアップロードすると、Lambda関数の成功イベントが成功キューに送信されます。これで、成功メッセージがキューにあることをみることができます。

 

  • メッセージをポーリングしてみることができます。

 

On Failure

  • 失敗イベントをテストするには、Lambdaコードを変更しておきます。
def handler(event, context): 
   raise Exception("Failed")

 

  • S3バケットにファイルをアップロードしておきます。
  • 失敗したイベントが失敗キューに送信されたことをみることができます。

 

まとめ

CDKを使用して SQSでLambda Destinations を作成してみました。他のDestinations[SNS, EventBridge, Lambda]でLambda Destinationsを試すことができます。

Reference:

S3 Event Source

Lambda Destinations

Lambda - SQS Destinations