Logstash Plugin for Amazon DynamoDBを使ってDynamoDBとElasticsearchを同期する

はじめに

AWS Official Blogにて、Logstash Plugin for Amazon DynamoDBという面白いツールが発表されました!

要は、Amazon DynamoDBに入れたデータをLogstashのpluginを使ってDynamoDB streamsから取得します。LogstashなのでそのままElasticsearchにデータを突っ込めます。そうするとDynamoDBとElasticsearchのデータが同期できます。便利!

ということで早速やってみました!

準備

DynamoDBのセットアップ

新規にDynamoDBテーブルを作成します。テーブル名は"member"、Primary Key TypeはHash、KeyはNumberで"id"という名前にしました。

DynamoDB_·_AWS_Console 2

このDynamoDBテーブルではStreamsを有効にします。View Typeは"New and Old Images"を指定します。

DynamoDB_·_AWS_Console 3

Logstashのインストール

Logstashはjavaで動くので、javaが入ってることを確認します。

$ java -version
java version "1.7.0_85"
OpenJDK Runtime Environment (amzn-2.6.1.3.61.amzn1-x86_64 u85-b01)
OpenJDK 64-Bit Server VM (build 24.85-b03, mixed mode)

次にLogstashの公式Webサイトからrpmをゲットしてインストールします。

$ wget https://download.elastic.co/logstash/logstash/packages/centos/logstash-1.5.3-1.noarch.rpm
$ sudo rpm -ivh logstash-1.5.3-1.noarch.rpm

インストールされたことを確認します。

$ /opt/logstash/bin/logstash --version
logstash 1.5.3

この状態だとLogstashのバイナリがあるディレクトリにPATHが通っていません。この後の作業のためにPATHを通しておきます。

$ export PATH=$PATH:/opt/logstash/bin

JRubyのインストール

Logstash Plugin for Amazon DynamoDBは、関連しているライブラリがJRubyのみをサポートしている関係で、JRubyが使える環境にする必要があります。まずはRVMをインストールします。

$ curl -O https://raw.githubusercontent.com/rvm/rvm/master/binscripts/rvm-installer
$ curl -sSL https://rvm.io/mpapis.asc | gpg2 --import -
$ sudo bash rvm-installer stable

RVM用の環境変数を設定した上で、JRubyをインストールします。

$ source ~/.profile
$ rvm install jruby

JRubyを使うように設定します。

$ rvm use jruby-1.7.19
Using /home/ec2-user/.rvm/gems/jruby-1.7.19

Logstash Plugin for Amazon DynamoDBのインストール

まず最初にbundlerをインストールしておきます。

$ gem install bundler

Logstash Plugin for Amazon DynamoDBのGitリポジトリをcloneします。

$ git clone https://github.com/awslabs/logstash-input-dynamodb
$ cd ./logstash-input-dynamodb

そしてbundle installします。

$ bundle install
Bundle complete! 3 Gemfile dependencies, 40 gems now installed.
Use `bundle show [gemname]` to see where a bundled gem is installed.

gem buildしてPluginインストール用のgemファイルを作成します。

$ gem build logstash-input-dynamodb.gemspec
  Successfully built RubyGem
  Name: logstash-input-dynamodb
  Version: 0.1.0
  File: logstash-input-dynamodb-0.1.0.gem

出来上がったgemファイルをpluginコマンドでインストールします。

$ sudo /opt/logstash/bin/plugin install --no-verify ./logstash-input-dynamodb-0.1.0.gem
Installation successful

Elasticsearchのインストール

最新のElasticsearchのrpmファイルをゲットし、インストールします。

$ wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.noarch.rpm
$ sudo yum install elasticsearch-1.7.1.noarch.rpm

そして起動。

$ sudo service elasticsearch start
Starting elasticsearch:                                    [  OK  ]

これで準備は完了です。

やってみた

logstashをLogstash Plugin for Amazon DynamoDBを使う形で起動します。

$ logstash -e 'input { 
    dynamodb{endpoint => "dynamodb.ap-northeast-1.amazonaws.com" 
    streams_endpoint => "streams.dynamodb.ap-northeast-1.amazonaws.com" 
    view_type => "new_and_old_images" 
    aws_access_key_id => "<YOUR_ACCESS_KEY>" 
    aws_secret_access_key => "<YOUR_SECRET_KEY>" 
    table_name => "member"} } 
output { 
    elasticsearch 
      { host => localhost
        port => "9200"
        protocol => "http"
        index => "dynamodb"
        document_type => "member"
      } 
    stdout { } }'

これでDynamoDBのmemberテーブルに入れたデータを、LogstashがDynamoDB streams経由で取得し、localhostにあるElasticsearchに投入します。ではDynamoDBのmemberテーブルにデータを投入してみます。

$ aws dynamodb put-item --table-name member --item '{"id":{"N":"1"},"Name":{"S":"sasaki"}}'
$ aws dynamodb put-item --table-name member --item '{"id":{"N":"2"},"Name":{"S":"kaji"}}'
$ aws dynamodb put-item --table-name member --item '{"id":{"N":"3"},"Name":{"S":"watanabe"}}'

Logstashでは以下のようなイベントログが出ます。

2015-08-18T01:45:58.369Z ip-172-31-14-208 {"eventID":"6eb45a35ecd59bd577e54258ed4ad0b6","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"1"}},"newImage":{"id":{"N":"1"},"Name":{"S":"sasaki"}},"sequenceNumber":"700000000000042797024","sizeBytes":18,"streamViewType":"NEW_AND_OLD_IMAGES"}}
2015-08-18T01:46:03.330Z ip-172-31-14-208 {"eventID":"02c0d03dfa5bc977c92111568b5fe552","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"2"}},"newImage":{"id":{"N":"2"},"Name":{"S":"kaji"}},"sequenceNumber":"800000000000042797107","sizeBytes":16,"streamViewType":"NEW_AND_OLD_IMAGES"}}
2015-08-18T01:46:08.330Z ip-172-31-14-208 {"eventID":"0bc3b5a26beaa0dacd48e08ec2e65e3a","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"3"}},"newImage":{"id":{"N":"3"},"Name":{"S":"watanabe"}},"sequenceNumber":"900000000000042797189","sizeBytes":20,"streamViewType":"NEW_AND_OLD_IMAGES"}}

Elasticsearchの状況を確認してみましょう。以下のように、"dynamodb"というindexが作成されています。

$ curl -XGET localhost:9200/_cat/indices?v
health status index               pri rep docs.count docs.deleted store.size pri.store.size
yellow open   dynamodb              5   1          3            0     13.2kb         13.2kb

データを検索してみます。

$ curl -XGET localhost:9200/dynamodb/member/_search?pretty -d '
  {
      "query" : {
          "match_all" : {}
      }
  }'

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "dynamodb",
      "_type" : "member",
      "_id" : "AU8-fChSn0awDhzz_8Z5",
      "_score" : 1.0,
      "_source":{"message":"{\"eventID\":\"0bc3b5a26beaa0dacd48e08ec2e65e3a\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"3\"}},\"newImage\":{\"id\":{\"N\":\"3\"},\"Name\":{\"S\":\"watanabe\"}},\"sequenceNumber\":\"900000000000042797189\",\"sizeBytes\":20,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:46:08.330Z"}
    }, {
      "_index" : "dynamodb",
      "_type" : "member",
      "_id" : "AU8-fAJBn0awDhzz_8Z3",
      "_score" : 1.0,
      "_source":{"message":"{\"eventID\":\"6eb45a35ecd59bd577e54258ed4ad0b6\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"1\"}},\"newImage\":{\"id\":{\"N\":\"1\"},\"Name\":{\"S\":\"sasaki\"}},\"sequenceNumber\":\"700000000000042797024\",\"sizeBytes\":18,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:45:58.369Z"}
    }, {
      "_index" : "dynamodb",
      "_type" : "member",
      "_id" : "AU8-fBTIn0awDhzz_8Z4",
      "_score" : 1.0,
      "_source":{"message":"{\"eventID\":\"02c0d03dfa5bc977c92111568b5fe552\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"2\"}},\"newImage\":{\"id\":{\"N\":\"2\"},\"Name\":{\"S\":\"kaji\"}},\"sequenceNumber\":\"800000000000042797107\",\"sizeBytes\":16,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:46:03.330Z"}
    } ]
  }
}

ちゃんと3件ともデータが入っていますね!

ついでに、DynamoDBからデータの削除もしてみます。

$ aws dynamodb delete-item --table-name member --key '{"id":{"N":"1"}}'

すると以下のように、"eventName"が"REMOVE"となったデータが追加されます。

$ curl -XGET localhost:9200/dynamodb/member/_search?pretty -d '
  {
      "query" : {
        "query_string": {
            "query": "REMOVE"
        }
      }
  }'

{
  "took" : 17298,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 0.038356602,
    "hits" : [ {
      "_index" : "dynamodb",
      "_type" : "member",
      "_id" : "AU8-iQZTn0awDhzz_8aA",
      "_score" : 0.038356602,
      "_source":{"message":"{\"eventID\":\"9278444ed6808f079caf8573017a7b33\",\"eventName\":\"REMOVE\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"1\"}},\"oldImage\":{\"id\":{\"N\":\"1\"},\"Name\":{\"S\":\"sasaki\"}},\"sequenceNumber\":\"1000000000000042808296\",\"sizeBytes\":18,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T02:00:11.598Z"}
    } ]
  }
}

さいごに

これまではアプリケーション側からDynamoDBとElasticsearchの両方にデータをPUTしていたものが、このプラグインを使うことで、DynamoDBだけにPUTすればよくなります。更に既にDynamoDBを使っていれば、アプリケーションの改修を行うことなく、Elasticsearchによる全文検索システムを追加構築することが出来るようになります。便利!