DynamoDBストリームのチュートリアルをやってみた

2020.09.10

DynamoDBテーブルにデータの追加や削除、変更した際にDynamoDBストリームを有効にしていると通知処理などが可能になります。本日はDynamoDBストリームのチュートリアル(以下のリンク)を試したのでまとめたいと思います。

チュートリアル: DynamoDB ストリーム および Lambdaを使用して新しい項目を処理する

作業手順

内容はチュートリアルに沿うように以下の順番で進めていきます。また、cliコマンドはシェルファイルとして作成し実行します。

  1. ストリームが有効になったDynamoDBテーブルを作成
  2. Lambda実行ロールを作成
  3. SNSトピックを作成
  4. Lambda関数の作成
  5. トリガーを作成してテストの実施

1. ストリームが有効になったDynamoDBテーブルを作成

では最初に、ストリームが有効になったDynamoDBテーブルを作成するcliコマンドをmake_dynamo_db.shにまとめます。 作成後の戻り値からLatest_stream_arnのみを表示するためjqコマンドを使用します。

make_dynamo_db.sh

#!/bin/bash

profile=プロファイル名
dynamodb_name=テーブル名

make_dynamo=`aws dynamodb create-table --table-name ${dynamodb_name}\
    --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
    --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=2,WriteCapacityUnits=2 \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
    --profile ${profile}`

Latest_stream_arn=`echo ${make_dynamo} | jq -r ".TableDescription.LatestStreamArn"`
echo "Latest_stream_arn: ${Latest_stream_arn}"

では実行します。実行後にLatest_stream_arnが表示されます。こちらはトリガーの作成時に必要になるので控えておきます。

$ bash make_dynamo_db.sh 
Latest_stream_arn: arn:aws:dynamodb:region:accountID:table/テーブル名/stream/タイムスタンプ

2. Lambda実行ロールを作成

次に、Lambda実行ロールを作成します。こちらはチュートリアルのリンクにある、trust-relationship.jsonrole-policy.json、cliコマンドをまとめて処理するmake_lambda_role.shを作成します。ファイル構成は以下のように同一ディレクトリに配置しています。

$ tree -L 1
.
├── make_role_add_policy.sh
├── role-policy.json
└── trust-relationship.json

role-policy.jsonについて7,16,26行目を修正します。

Lambda関数名DynamoDBテーブル名をチュートリアルとは異なる名前にしていたので7行目と26行目を作成した名称以外にしてしまいエラーになって少しハマりました。

make_role_add_policy.sh

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "lambda:InvokeFunction",
            "Resource": "arn:aws:lambda:region:accountID:function:Lambda関数名*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:region:accountID:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Resource": "arn:aws:dynamodb:region:accountID:table/DynamoDBテーブル名/stream/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

ロールの作成とポリシーを追加するcliコマンドをmake_role_add_policy.shにまとめて記載します。作成したroleのarnを取得するためjqを使用します。

make_role_add_policy.sh

#!/bin/bash

profile=プロファイル名
role_name=ロール名
policy_name=ポリシー名

# role作成
make_role=`aws iam create-role --role-name ${role_name} \
    --path "/service-role/" \
    --assume-role-policy-document file://trust-relationship.json \
    --profile ${profile}`

# roleのarnを表示する
role_arn=`echo ${make_role} | jq -r ".Role.Arn"`
echo "role_arn: ${role_arn}"

# roleが作成されるまで待つ
aws iam wait role-exists --role-name ${role_name} --profile ${profile}

# ポリシーを追加
aws iam put-role-policy --role-name ${role_name} \
    --policy-name ${policy_name} \
    --policy-document file://role-policy.json \
    --profile ${profile}

実行します。実行後に、roleのarnが表示されます。こちらはLambda関数を作成する時に必要になりますので控えておきます。

$ bash make_role_add_policy.sh 
role_arn: arn:aws:iam::accountID:role/service-role/ロール名

3. SNSトピックを作成

snsのトピックを作成し、Eメールをサブスクライブします。

make_sns.sh

#!/bin/bash

profile=プロファイル名
topic_name=トピック名
email=メールアドレス

make_sns=`aws sns create-topic --name ${topic_name} --profile ${profile}`
sns_arn=`echo ${make_sns} | jq -r ".TopicArn"`
echo "sns_arn: ${sns_arn}"

subscribe=`aws sns subscribe \
    --topic-arn ${sns_arn} \
    --protocol email \
    --notification-endpoint ${email} \
    --profile ${profile}`

実行します。実行後に、snsのarnが表示されます。こちらはpublishNewBark.jsを作成する時に必要になります。

$ bash make_sns.sh
sns_arn: arn:aws:sns:region:accountID:snsトピック名

コマンド実行後、下記のように確認のメールが届くので、Confirm subscriptionをクリックし許可します。

4. Lambda関数の作成

Lambda関数を作成します。チュートリアルのリンクにあるpublishNewBark.jsを作成しzipファイルにします。そしてLambda関数を作成するcliコマンドを実行するmake_lambda.shを作成します。作成後のディレクトリ構成は以下のようになります。

$ tree -L 1
.
├── make_lambda.sh
├── publishNewBark.js
└── publishNewBark.zip

publishNewBark.jsの17行目を先ほど作成したsnsのarnに変更します。

publishNewBark.js

'use strict';
var AWS = require("aws-sdk");
var sns = new AWS.SNS();

exports.handler = (event, context, callback) => {

    event.Records.forEach((record) => {
        console.log('Stream record: ', JSON.stringify(record, null, 2));

        if (record.eventName == 'INSERT') {
            var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
            var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
            var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
            var params = {
                Subject: 'A new bark from ' + who,
                Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                TopicArn: 'arn:aws:sns:region:accountID:snsトピック名'
            };
            sns.publish(params, function(err, data) {
                if (err) {
                    console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                } else {
                    console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                }
            });
        }
    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

publishNewBark.jsのzipファイルを作成します。

$ zip publishNewBark.zip publishNewBark.js

Lambda関数を作るmake_lambda.shを作成します。roleARNにはmake_role_add_policy.sh実行時に取得したrole_arnを使用します。

make_lambda.sh

#!/bin/bash

profile=プロファイル名
function_name=Lambda関数名

roleARN="arn:aws:iam::accountID:role/service-role/dynamoテーブル名"

make_lambda=`aws lambda create-function \
    --region ap-northeast-1 \
    --function-name ${function_name} \
    --zip-file fileb://publishNewBark.zip \
    --role ${roleARN} \
    --handler publishNewBark.handler \
    --timeout 5 \
    --runtime nodejs10.x \
    --profile ${profile}`

Lambda関数を作成します。

$ bash make_lambda.sh

5. トリガーを作成してテストの実施

Lambda関数をイベントソース(DynamoDBテーブルストリーム)に関連づけます。

ロールを作成した時に表示したLatest_stream_arnを使用します。今回はチュートリアルと同じようにcliコマンドでのarnを取得します。

make_trigger.sh

#!/bin/bash

profile=プロファイル名
dynamodb_name=dynamoDBテーブル名
function_name=Lambda関数名

# streams arnの取得
dynamodb=`aws dynamodb describe-table --table-name ${dynamodb_name} --profile ${profile}`
streamARN=`echo ${dynamodb} | jq -r ".Table.LatestStreamArn"`

# マッピングを行う
event_mapping=`aws lambda create-event-source-mapping \
    --region ap-northeast-1 \
    --function-name ${function_name} \
    --event-source ${streamARN} \
    --batch-size 1 \
    --starting-position TRIM_HORIZON --profile ${profile}`

トリガーを追加します。

$ bash make_trigger.sh

トリガーの設定ができたので実際にデータをinsertし、メールが届くか確認します。テストを行うためtest.shを作成します。

test.sh

#!/bin/bash

profile=プロファイル名
aws dynamodb put-item \
    --table-name cm-shimoji-dynamo-test \
    --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"} \
    --profile ${profile}

実行します。

$ bash test.sh

実行後、以下のようにメールが届きDynamoDBのinsert時の処理をトリガーにLambdaが起動されていることを確認しました。

まとめ

DynamoDBストリームに興味があり調べてみるとチュートリアル記事がありましたのでcliコマンドをシェルファイルにまとめながら実行する内容で沖縄の下地がお届けしました。この記事がどなたかの助けになれば幸いです。

参考リンク