[アップデート]DynamoDBのアイテムレベルの変更をKinesis Data Streamsで直接キャプチャできるようになりました

DynamoDBストリーム起動のLambdaが不要になるかも??
2020.11.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

CX事業本部@大阪の岩田です。

2020/11/23付けのアップデートによりKinesis Data Streams for DynamoDBが利用可能になりました。この機能を利用すると、DynanoDBのアイテムレベルの変更をKinesis Data Streamsで直接キャプチャすることが可能です。

これまでもDynamoDB ストリームを利用してDynamoDBの変更をキャプチャすることはできましたが、今回Kinesis Data Streamsによるサポートが追加されたことで、より高度なシナリオに対応できるようになりました。

例えばですが

  • Kinesis Data Analyticsを利用してストリームされた変更内容を分析する
  • Kinesis Data Firehoseを利用してS3やAmazonES、Redshiftといった別サービスにデータを投入する

といったことが可能になりました。これまでもDynamoDB ストリームからLambdaを起動すればS3やAmazonES、Redshiftにデータを投入することは可能でしたが、今回のアップデートにより(データ変換を行わないのであれば)Lambda無しでもデータの投入が可能になりました!!※Firehoseを利用する場合はDynamoDBストリームからLambdaをトリガーするのに比べてデータ同期までのタイムラグが大きくなるので、その点も要注意です

また、DynamoDB ストリームからLambdaを起動するアーキテクチャは

  • バッチサイズ
  • 並列化係数

こそ調整可能でしたが、DynamoDBストリームのシャード数を調整するという概念がありませんでした。そのため、大量にDynamoDBの更新操作が発生するようなユースケースでDynamoDBストリームを採用する場合は、DynamoDBストリームが「詰まる」という懸念点がありました。Kinesis Data Streamsであれば、シャード数を調整することでスループットを上げることができるので、ストリームが「詰まる」というリスクを低減できます。

やってみる

実際にAmazon Kinesis Data Streams for DynamoDBを利用してDyanamoDBからKinesis Data Streamにデータを流してみましょう。

Amazon ESのドメインを作成

まずはAmazonESのドメインを作成します。マネコンから「次へ」、「次へ」で進めていきます。

ドメインが作成できたらkibanaの画面を確認しておきましょう。最初はインデックスが作成されていないので、特に何も見えないはずです。

Kinesis Data Streamsの作成

続いてKinesis Data Streamsを作成します。こちらも特に特別なことはせずにマネコンから「次へ」、「次へ」で進めていきます。簡単な動作確認が目的なので、シャード数は1にしています。

Kinesis Data FirehoseのDeliveryStream作成

作成したデータストリームのコンシューマーとして配信ストリームを作成します。

先程作成したデータストリームをsourceに、destinationはAmazon ESのドメインを指定します

DynamoDBのテーブル作成とストリームの有効化

続いてDynamoDBのテーブル作成とストリーム有効化を行います。テスト用におなじみのMusicテーブルを作成し、このテーブルに対してKinesis Data Streams for DynamoDBを有効化してみます。

「Kinesis へのストリーミングを管理」というボタンが増えているので、ここから先程作成したデータストリームへのストリームを有効化します。

DynamoDBにデータを投入してKibanaから確認してみる

準備が整ったので、DynamoDBに適当にデータを投入してみましょう。せっかくなのでPartiQLで投入してみます。

$aws dynamodb execute-statement --statement \
    "INSERT INTO Music  \
    VALUE  \
    {'Artist':'Acme Band','SongTitle':'PartiQL Rocks1'}"

$aws dynamodb execute-statement --statement \
    "UPDATE Music  \
    SET AwardsWon=1  \
    SET AwardDetail={'Grammys':[2020, 2018]}  \
    WHERE Artist='Acme Band' AND SongTitle='PartiQL Rocks1'"
    
$aws dynamodb execute-statement --statement \
    "INSERT INTO Music  \
    VALUE  \
    {'Artist':'Acme Band','SongTitle':'PartiQL Rocks2'}"
    
$aws dynamodb execute-statement --statement \
    "INSERT INTO Music  \
    VALUE  \
    {'Artist':'Acme Band','SongTitle':'PartiQL Rocks3'}"
    
$aws dynamodb execute-statement --statement \
    "DELETE  FROM Music  \
    WHERE Artist='Acme Band' AND SongTitle='PartiQL Rocks2'"

データ投入後Firehoseによるデータ同期が実行されるまでしばらく待ちます...

しばらく待ってから再度Kibanaを確認すると...

FirehoseのDeliveryStream作成時に指定したインデックスdynamodb-tableが作成されたことが分かります。

Indexを指定してそのままCreate Index Pattern...

Discoverから確認してみましょう

見えました!単にストリームに流れてきた内容をそのままAmazonESに突っ込んでるだけなので、いわゆるCQRS(Command-Query Responsibility Segregation)な構成の実装パターンとしては微妙ですが、まあ今回はちょっと触ってみるのが目的なので、これで良しとします。

まとめ

Kinesis Data Streams for DynamoDBについてご紹介しました。今回紹介したパターン以外にもDynamoDBのデータをS3やRedshiftに連携しているケースなど、色々と利用シーンが想定される熱いアップデートですね。Kinesisファミリーとの組み合わせが便利というメリット以外にもシャード数を制御できるというメリットもあるので、DynamoDBストリームを利用していたシステムではKinesis Data Streams for DynamoDBへの乗り換えを検討してみてはいかがでしょうか?