DynamoDB Stream を利用してDynamoDBとElasticsearchのデータ連携をやってみた

DynamoDB Stream を利用してDynamoDBとElasticsearchのデータ連携をやってみた

AWS DynamoDB Streamを利用しDynamoDBとElasticsearchのデータ連携の関する記事です。AWS LambdaはTypeScriptで作成して、serverless frameworkでデプロイします。
Clock Icon2019.08.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!コンサルティング部のキムです。

以前から外国で仕事をしたいなーと思ってましたが、言語が違うことの難しさを感じてます。もっと頑張っていきたいと思います!:) ここ数年、DynamoDBに興味があったのですが、DynamoDB Streamは全く触ったことがありませんでした。AppSyncでよく使われるパターンでもあるので、良い機会だと思ってDynamoDB Stream → ElasticSearch のデータ連携の構成をやってみました。

ElasticsearchのIndexingは1秒ほど掛かるためリアルタイムで連携されるのではありませんが、普通のユーズケースには大丈夫と思います。DynamoDBがメインDBになった場合でも、検索機能の限界の為、GSIやLSI等で頑張らなくて済む良いパターンだと思います。

それでは、始めます!

目次

DynamoDB Streamとは

DynamoDB Streamとは、名前のままDynamoDBテーブルのデータに変更が発生された瞬間、この変更事項をイベントストリームの形で処理できる機能です。ストリーミングイベントが発生したら、登録されたAWS Lambda等のサービスを呼び出してイベントを処理します。この時、Lambdaのコードの問題ではない原因で関数が失敗した場合、つまり、DynamoDB Streamからイベントソース関数の呼び出しが失敗した場合には、AWS Lambdaはその失敗したレコードバッチを最大7日間リトライ処理を試みます。非常に高いレベルの安全性が提供されているので、安心して利用できます。

Elasticsearchとは

Elasticsearch は Apache Lucene(ルシーン)基盤の分散処理検索エンジンにて、主にELKスタックやElastic Stackのコアコンポーネントとして知られています。しばらく前までにはAmazon Elasticsearch Serviceで6.7バージョンまで利用可能でしたが(現時点での最新バージョンは7.3)これからは6.8または7.1バージョンも利用可能になりました!6.xバージョンと7.xバージョンの違いはElasticの公式文書をご参考ください。

やってみた

※英語のマネージメントコンソールを参照しています

DynamoDB Streamを利用してElasticsearchにデータ連携する方法は以下のように簡単です。 1. DynamoDBテーブルが無ければ一つ作ります。 2. 該当DynamoDBテーブルのStream機能を有効化します。 3. Amazon Elasticsearch Serviceのドメイン(クラスター)を生成します。 4. Lambda関数を作って、DynamoDB Streamイベントを処理するように設定します。

1. DynamoDBテーブル生成

この記事では新しいDynamoDBテーブルを作るのから始めます。テーブル名はEmployee、パーティションキーは id (String) で指定しました。他の設定はデフォルトで大丈夫です。

2. DynamoDB Stream 有効化

テーブルのオーバービューの画面でストリームManage Streamボタンをクリックします。

View TypeはKeys only、New image、Old imageの三つの種類があります。 Keys onlyは変更されたデータのキーのみを渡す方式です。 New imageは新たなデータ(生成、変更)を渡す方式です。 Old imageは既存データ(変更、削除)を渡す方式です。 本記事ではNew and old imagesを選択して進めます。

以下のようにストリームが生成されたらarnを確認しておきます。

3. Amazon Elasticsearch Service ドメイン生成

Amazon Elasticsearch Serviceではドメインという名前を元のElasticsearch Clusterとほぼ一緒の意味で使います。ドメインの場合、AWSのいろんな設定が含まれてありますが、本記事の範囲以外になる為省略します。 Amazon Elasticsearch Serviceコンソル画面でCreate a New Domainボタンをクリックしましょう。 以下のようにDevelopment and testingと選択してバージョンは6.7を選択します。

ドメイン名は命名規則に留意して作成します。費用はなるべく節約したいのでインスタンスタイプはt2.small.elasticsearchにしました。

本記事ではセキュリティーや設定等を簡単にしてDynamoDB Streamとの連携だけを証明したく、Public accessを許可します。

Elasticsearchドメインが生成されるまでに10〜20分ほどの時間がかかります。ドメインが生成されたら以下のようにエンドポイントの確認ができます。

4. Lambda関数作成

LambdaはTypescriptで作成しました。

dynamoStreamHandler.ts

import { DynamoDBStreamEvent } from "aws-lambda";
import { insertDataToES, modifyDataToES, removeDataToES } from "./streamUtils";

const esDomainUrl = 'https://search-my-search-domain-qdttvcr4xck44yqqhledc6g5qm.ap-northeast-1.es.amazonaws.com';
const indexName = 'employee';
const typeName = 'doc';

export function handler(event: DynamoDBStreamEvent) {
  const records = event.Records;

  const keyName = 'id';
  const requestUrl = esDomainUrl + '/' + indexName + '/' + typeName + '/';

  records.forEach(r => {
    const type = r.eventName;
    const dynamodb = r.dynamodb!;

    switch (type) {
      case "INSERT":
        insertDataToES(keyName, requestUrl, dynamodb);
        break;
      case "MODIFY":
        modifyDataToES(keyName, requestUrl, dynamodb);
        break;
      case "REMOVE":
        removeDataToES(keyName, requestUrl, dynamodb);
        break;
      } 
  });
}

streamUtils.ts

import { StreamRecord } from "aws-lambda";
import AWS from "aws-sdk";
import axios from "axios";

export function insertDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const insertedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.NewImage! );
    requestUrl += insertedData[keyName] + '/';

    axios.post(requestUrl, insertedData).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

export function modifyDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const modifiedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.NewImage! );
    const wrappedData = { "doc": modifiedData };
    requestUrl += modifiedData[keyName] + '/_update';

    axios.post(requestUrl, wrappedData).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

export function removeDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const removedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.OldImage! );
    requestUrl += removedData[keyName];

    axios.delete(requestUrl).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

私はこのLambda関数をサーバーレスフレームワークを用いてデプロイしましたので、serverless.ymlにDynamoDB Stream情報を書きます。

service:
name: ddb-stream-to-es-test
frameworkVersion: '>=1.0.0 <2.0.0'
provider:
name: aws
runtime: nodejs8.10
stage: dev
region: ap-northeast-1
functions:
DynamoDBCallback:
  handler: dist/dynamoStreamHandler.handler
  events:
    - stream:
        arn: DDB_STREAM_ARN
        type: dynamodb
        batchSize: 100
  memorySize: 128
  timeout: 60

私がテストしたコードはGithubにアップロードしましたので、ご参考ください。

以下のコマンドを入力してデプロイします。

npm run deploy

5. テスト及び結果

実際にこれだけで動くのか確認しましょう。DynamoDB Employee テーブルに新たなデータを生成します。

データが生成されたらすぐDynamoDB Streamを介してLambda関数が呼び出されます。CloudWatch Logsのログを確認したら正常に実行されました。

このログを見るとElasticsearchにIndexを生成するのも同時にデータがInsertされたようです。Elasticsearchのエンドポイントを通じて確認しましょう。

INSERTは成功でした。あとMODIFYやREMOVEもしっかり動くのか確認してみます。

roleフィールドを追加しました。

予想通り動作しました!最後にREMOVEです。

綺麗に消されました!

まとめ

DynamoDB + Elasticsearchの組み合わせはよく使われるパターンでもありますが、ブログにまとめられた記事が見つからなかったので自分で書きました。DynamoDB Streamはクリックするだけで有効化されるほど簡単だし、Lambda関数一つだけで2種類のデータストアーの同期化ができるのは素晴らしいことだと感じました。また、DynamoDBのいろんなパターンを勉強したいと思いました。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.