【新機能】Amazon DynamoDB Triggersを使ってDynamoDB StreamsとAWS Lambdaを連携する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、せーのです。今日は個人的にも待ちに待ったアップデートが入ってきました。そう、DynamoDB Streams!!! この機能を使うとデータの入出力がそのままストリームとして後方に流れていき、トリガーとしてEC2やLambdaと連携することができるのです。 もうポーリングとかそういう煩わしい作業は入りません。値が登録されたことをDB自らが教えてくれるのです。すごい!かっこいい!!

、、、多少取り乱しました。では中身を見ていきましょう。

DynamoDB Streamsとは

今回のアップデートでDynamoDB Streamsを有効にするとDynamoDB内でのアイテムの変化(登録、更新、削除)が24時間ストリームとして時間順に流されます。このストリームには簡単なAPIでアクセスすることができ、その中身を他のデータストア、例えばRDSやElastiCache等に使用することが可能となります。 アクセスするにはKinesis Client Library(KCL)をEC2にインストールし、そこからAPIを叩きます。そう、DynamoDB Streamsの仕組みはKinesisに非常に似ているんですね。というかもうお手軽なKinesisと言ってもいいかもしれません。なのでKinesisのデータを受け取る仕組みを使ってDynamoDB Streamsも受け取ることができます。 その他にもDynamoDBのSDKを使用してもDynamoDB Streamsを受け取ることが出来るようになっています(preview時にはなかったと思います。これは便利!)。KCLはちょっとハードルが高い、という方はこちらを使用すると簡単にデータを取ることが出来ます。

またこのストリームが流れた事をトリガーとして(これをDynamoDB Triggersといいます)Lambdaを発火させることができます。VPC内に配置されていないElastiCacheやS3等にデータを流す、またデータの中身を使って次のアクションを決める(SNSで通知する等)時にもこちらの機能が使えますね。今回はこちらを試してみたいと思います。

やってみた

では早速やってみましょう。マネージメントコンソールからDynamoDBを開きます。

dynamodbstreamswithlambda1

テーブルを作ります。今回はID、年齢、名前という組み合わせのデータを想定します。ハッシュキーにIDを指定します。

dynamodbstreamswithlambda2

一応IDと年齢でインデックスも作っておきます。今回は使いませんが。

dynamodbstreamswithlambda3

スループットを指定します。テストなので読み書きとも1ユニットにしておきます。本来はここがDynamoDBで一番悩むところです。画面上にある「プロビジョニングする必要のあるスループットキャパシティーを計算するにはどうすればよいですか?」のチェックをクリックするとスループットの計算画面に遷移しますのでこちらを上手に使ってスループットを計算して下さい。

dynamodbstreamswithlambda4

次にキャパシティの警告アラームの有無を聞かれます。今回は使わないのでチェックを外します。 そしていよいよここでストリームの有効化が出てきます。「表示タイプ」というのは実際にストリームに流れるデータの内容を表しています。「キーのみ」「新イメージ(データ更新後の内容)」「新旧イメージ(データ更新前の内容と更新後の内容を両方表示する)」の中から選びます。今回は「新旧イメージ」を選択します。

dynamodbstreamswithlambda5

確認画面を通してテーブルを作成します。テーブルが出来ると新たに下に「ストリーム」というタブが増えていますので選択してみると、先ほどの内容でストリームができています。簡単ですね。

dynamodbstreamswithlambda6

次にこのストリームに連携させるLambdaを作っていきます。マネージメントコンソールからLambdaを開いて新規のFunctionを作成するボタンを押下するとblueprintに「dynamodb-process-stream」というblueprintがありますのでこちらを使いましょう。blueprintについてはこちらの記事を御覧ください。

dynamodbstreamswithlambda7

イベントソースを選択する画面ではDynamoDBの先程作成したテーブルを選択します。「Batch size」というのはこの関数一回に処理するストリームの数を表します。1件ずつ処理するなら1を、最大で10000まで指定できます。複数のレコードをいっぺんに処理するならLambda Functionの中でループ処理させることになりますね。 「Starting Position」というのは読み取りを開始する位置を表します「Latest(最新のものから順番に読み取る)」と「Trim horizon(読み取りされていないものを古い順から読み取る)」の2種類から選択します。時間順にデータを処理するならTrim horizon、最新のデータを使用する設計ならLatestを選びましょう。

dynamodbstreamswithlambda8

Functionの作成画面です。今回はblueprintそのままでいきたいと思います。ちなみにDynamoDB TriggersはNode.jsでもJavaでもどちらでも使えます。

dynamodbstreamswithlambda9

Lambda Functionに対する権限をIAM Roleにて設定します。今回はDynamoDBを使うのでDynamoDBとCloudWatch Logsの操作権限を付けます。これもサンプルとしてDynamoDB Stream用のRoleが用意されているのでそちらを選択して新規に作成するだけでOKです。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction"
      ],
      "Resource": [
        "*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:DescribeStream",
        "dynamodb:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

dynamodbstreamswithlambda10

確認画面でイベントソースをいますぐ有効にするかを選択することができます。一旦テストしてから有効にする場合は「Enable later」、Functionの内容が問題ない場合は「Enbale now」を選びましょう。今回は「Enable later」を選びます。

dynamodbstreamswithlambda11

これでLambda Functionができました。Testボタンを押してFunctionをテストしてみます。

dynamodbstreamswithlambda13

サンプルとしてのテストデータを入力する画面が出てきます。こちらで「DynamoDB Update」を選択すると一般的なDynamoDBのINSERT,UPDATE,DELETEのサンプルデータが出てきます。

dynamodbstreamswithlambda14

こちらを元に今回のテーブル内容に合うようにテストデータを改修していきます。リージョンを東京に、データをID,age,nameに増やします。

{
  "Records": [
    {
      "eventID": "1",
      "eventName": "INSERT",
      "eventVersion": "1.0",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-1",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          },
          "age": {
            "N": "25"
          },
          "name": {
            "S": "Tsuyoshi Seino"
          }
        },
        "SequenceNumber": "111",
        "SizeBytes": 26,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899"
    },
    {
      "eventID": "2",
      "eventName": "MODIFY",
      "eventVersion": "1.0",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-northeast-1",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          },
          "age": {
            "N": "39"
          },
          "name": {
            "S": "Tsuyoshi Seino"
          }
        },
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          },
          "age": {
            "N": "25"
          },
          "name": {
            "S": "Tsuyoshi Seino"
          }
        },
        "SequenceNumber": "222",
        "SizeBytes": 59,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899"
    },
    {
      "eventID": "3",
      "eventName": "REMOVE",
      "eventVersion": "1.0",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-west-2",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          },
          "age": {
            "N": "25"
          },
          "name": {
            "S": "Tsuyoshi Seino"
          }
        },
        "SequenceNumber": "333",
        "SizeBytes": 38,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899"
    }
  ]
}

こちらでSubmitを押すとこのデータをLambda Functionに流してくれます。結果ログが下に出てきますので確認しましょう。

dynamodbstreamswithlambda15

テストが問題なかったのでLambda Functionを有効化します。Lambda FunctionのStateをクリックします。

dynamodbstreamswithlambda16

確認画面が出てくるのでDynamoDB Streamsがあっているかどうか確認して「Enable」ボタンを押します。これでDynamoDB StreamsとLambda Functionがつながりました。

dynamodbstreamswithlambda17

dynamodbstreamswithlambda18

ちなみにDynamoDB StreamsとLambda Functionの連携はDynamoDB側からもできます。DynamoDB Streamsの対象ストリームを選択し「Lambda関数を関連付ける」をクリックします。

dynamodbstreamswithlambda19

すると先程Lambda Functionの作成時に最初に設定した読み取りの開始点とバッチサイズの指定ができます。この時点でLambda Functionを作っていなければ文章内のリンクからLambdaのコンソールに飛べますのでそちらから設定して下さい。DynamoDB Streamsの方は日本語化されていてLambdaの方は英語のままなので多少混乱しますね。

dynamodbstreamswithlambda20

さて、どちらかの方法を使ってDynamoDB StreamsとLambda Functionを連携させたら実際にDynamoDBにデータを入れてみましょう。DynamoDBのテーブルを開いて「アイテムの作成」をクリックします。

dynamodbstreamswithlambda21

データをJSON形式で作り「保存」をクリックします。

dynamodbstreamswithlambda22

dynamodbstreamswithlambda23

データが増えたことを確認します。これで既にDynamoDB Streamsにデータが流れ、Lambdaが発火しているはずです。

dynamodbstreamswithlambda24

Lambdaが正常に発火したかどうかログを見て確認しましょう。CloudWatch Logsを開きます。マネージメントコンソールからCloudWatchにいき、「ログ」から先程作ったLambda Functionのログを開きます。

dynamodbstreamswithlambda25

INSERTというタイプで先ほどのデータがLambdaにわたっていることが確認できます。

dynamodbstreamswithlambda26

次にデータを更新してみます。DynamoDBに戻って先ほどのデータの年齢と名前を変えてみます。

dynamodbstreamswithlambda27

dynamodbstreamswithlambda28

再びCloudWatch Logsを見てみます。NewImageの所に変更後のデータが、OldImageのところには変更前のデータがあることが確認できます。この差分を使って例えばリアルタイムに更新されるデータポイントをグラフ化したりするわけです。

dynamodbstreamswithlambda29

まとめ

いかがでしたでしょうか。やってみると簡単なことがわかるかと思います。こんなに簡単にデータドリブンのシステムが、しかもサーバーレスで作れる、というのは本当にいい時代になりました! エンジニアとしてベテランの方の中にはNo SQLのデータストアであるDynamoDBに抵抗のあった方も結構いるかと思います。私も最初は抵抗があったので。でも使ってみるとレスポンスの速さ等は特筆すべきところがあり、慣れればクエリやスキャンも自由にできるようになります。 これを機会にDynamoDBを触ってみて、サーバーレスなシステムを構築してみてはいかがでしょうか?

参考リンク