[アップデート]DynamoDBのアイテムレベルの変更をKinesis Data Streamsで直接キャプチャできるようになりました
CX事業本部@大阪の岩田です。
2020/11/23付けのアップデートによりKinesis Data Streams for DynamoDBが利用可能になりました。この機能を利用すると、DynanoDBのアイテムレベルの変更をKinesis Data Streamsで直接キャプチャすることが可能です。
これまでもDynamoDB ストリームを利用してDynamoDBの変更をキャプチャすることはできましたが、今回Kinesis Data Streamsによるサポートが追加されたことで、より高度なシナリオに対応できるようになりました。
例えばですが
- Kinesis Data Analyticsを利用してストリームされた変更内容を分析する
- Kinesis Data Firehoseを利用してS3やAmazonES、Redshiftといった別サービスにデータを投入する
といったことが可能になりました。これまでもDynamoDB ストリームからLambdaを起動すればS3やAmazonES、Redshiftにデータを投入することは可能でしたが、今回のアップデートにより(データ変換を行わないのであれば)Lambda無しでもデータの投入が可能になりました!!※Firehoseを利用する場合はDynamoDBストリームからLambdaをトリガーするのに比べてデータ同期までのタイムラグが大きくなるので、その点も要注意です
また、DynamoDB ストリームからLambdaを起動するアーキテクチャは
- バッチサイズ
- 並列化係数
こそ調整可能でしたが、DynamoDBストリームのシャード数を調整するという概念がありませんでした。そのため、大量にDynamoDBの更新操作が発生するようなユースケースでDynamoDBストリームを採用する場合は、DynamoDBストリームが「詰まる」という懸念点がありました。Kinesis Data Streamsであれば、シャード数を調整することでスループットを上げることができるので、ストリームが「詰まる」というリスクを低減できます。
やってみる
実際にAmazon Kinesis Data Streams for DynamoDBを利用してDyanamoDBからKinesis Data Streamにデータを流してみましょう。
Amazon ESのドメインを作成
まずはAmazonESのドメインを作成します。マネコンから「次へ」、「次へ」で進めていきます。
ドメインが作成できたらkibanaの画面を確認しておきましょう。最初はインデックスが作成されていないので、特に何も見えないはずです。
Kinesis Data Streamsの作成
続いてKinesis Data Streamsを作成します。こちらも特に特別なことはせずにマネコンから「次へ」、「次へ」で進めていきます。簡単な動作確認が目的なので、シャード数は1にしています。
Kinesis Data FirehoseのDeliveryStream作成
作成したデータストリームのコンシューマーとして配信ストリームを作成します。
先程作成したデータストリームをsourceに、destinationはAmazon ESのドメインを指定します
DynamoDBのテーブル作成とストリームの有効化
続いてDynamoDBのテーブル作成とストリーム有効化を行います。テスト用におなじみのMusicテーブルを作成し、このテーブルに対してKinesis Data Streams for DynamoDBを有効化してみます。
「Kinesis へのストリーミングを管理」というボタンが増えているので、ここから先程作成したデータストリームへのストリームを有効化します。
DynamoDBにデータを投入してKibanaから確認してみる
準備が整ったので、DynamoDBに適当にデータを投入してみましょう。せっかくなのでPartiQLで投入してみます。
$aws dynamodb execute-statement --statement \ "INSERT INTO Music \ VALUE \ {'Artist':'Acme Band','SongTitle':'PartiQL Rocks1'}" $aws dynamodb execute-statement --statement \ "UPDATE Music \ SET AwardsWon=1 \ SET AwardDetail={'Grammys':[2020, 2018]} \ WHERE Artist='Acme Band' AND SongTitle='PartiQL Rocks1'" $aws dynamodb execute-statement --statement \ "INSERT INTO Music \ VALUE \ {'Artist':'Acme Band','SongTitle':'PartiQL Rocks2'}" $aws dynamodb execute-statement --statement \ "INSERT INTO Music \ VALUE \ {'Artist':'Acme Band','SongTitle':'PartiQL Rocks3'}" $aws dynamodb execute-statement --statement \ "DELETE FROM Music \ WHERE Artist='Acme Band' AND SongTitle='PartiQL Rocks2'"
データ投入後Firehoseによるデータ同期が実行されるまでしばらく待ちます...
しばらく待ってから再度Kibanaを確認すると...
FirehoseのDeliveryStream作成時に指定したインデックスdynamodb-table
が作成されたことが分かります。
Indexを指定してそのままCreate Index Pattern...
Discoverから確認してみましょう
見えました!単にストリームに流れてきた内容をそのままAmazonESに突っ込んでるだけなので、いわゆるCQRS(Command-Query Responsibility Segregation)な構成の実装パターンとしては微妙ですが、まあ今回はちょっと触ってみるのが目的なので、これで良しとします。
まとめ
Kinesis Data Streams for DynamoDBについてご紹介しました。今回紹介したパターン以外にもDynamoDBのデータをS3やRedshiftに連携しているケースなど、色々と利用シーンが想定される熱いアップデートですね。Kinesisファミリーとの組み合わせが便利というメリット以外にもシャード数を制御できるというメリットもあるので、DynamoDBストリームを利用していたシステムではKinesis Data Streams for DynamoDBへの乗り換えを検討してみてはいかがでしょうか?