Logstash Plugin for Amazon DynamoDBを使ってDynamoDBとElasticsearchを同期する
はじめに
AWS Official Blogにて、Logstash Plugin for Amazon DynamoDBという面白いツールが発表されました!
要は、Amazon DynamoDBに入れたデータをLogstashのpluginを使ってDynamoDB streamsから取得します。LogstashなのでそのままElasticsearchにデータを突っ込めます。そうするとDynamoDBとElasticsearchのデータが同期できます。便利!
ということで早速やってみました!
準備
DynamoDBのセットアップ
新規にDynamoDBテーブルを作成します。テーブル名は"member"、Primary Key TypeはHash、KeyはNumberで"id"という名前にしました。
このDynamoDBテーブルではStreamsを有効にします。View Typeは"New and Old Images"を指定します。
Logstashのインストール
Logstashはjavaで動くので、javaが入ってることを確認します。
$ java -version java version "1.7.0_85" OpenJDK Runtime Environment (amzn-2.6.1.3.61.amzn1-x86_64 u85-b01) OpenJDK 64-Bit Server VM (build 24.85-b03, mixed mode)
次にLogstashの公式Webサイトからrpmをゲットしてインストールします。
$ wget https://download.elastic.co/logstash/logstash/packages/centos/logstash-1.5.3-1.noarch.rpm $ sudo rpm -ivh logstash-1.5.3-1.noarch.rpm
インストールされたことを確認します。
$ /opt/logstash/bin/logstash --version logstash 1.5.3
この状態だとLogstashのバイナリがあるディレクトリにPATHが通っていません。この後の作業のためにPATHを通しておきます。
$ export PATH=$PATH:/opt/logstash/bin
JRubyのインストール
Logstash Plugin for Amazon DynamoDBは、関連しているライブラリがJRubyのみをサポートしている関係で、JRubyが使える環境にする必要があります。まずはRVMをインストールします。
$ curl -O https://raw.githubusercontent.com/rvm/rvm/master/binscripts/rvm-installer $ curl -sSL https://rvm.io/mpapis.asc | gpg2 --import - $ sudo bash rvm-installer stable
RVM用の環境変数を設定した上で、JRubyをインストールします。
$ source ~/.profile $ rvm install jruby
JRubyを使うように設定します。
$ rvm use jruby-1.7.19 Using /home/ec2-user/.rvm/gems/jruby-1.7.19
Logstash Plugin for Amazon DynamoDBのインストール
まず最初にbundlerをインストールしておきます。
$ gem install bundler
Logstash Plugin for Amazon DynamoDBのGitリポジトリをcloneします。
$ git clone https://github.com/awslabs/logstash-input-dynamodb $ cd ./logstash-input-dynamodb
そしてbundle install
します。
$ bundle install Bundle complete! 3 Gemfile dependencies, 40 gems now installed. Use `bundle show [gemname]` to see where a bundled gem is installed.
gem build
してPluginインストール用のgemファイルを作成します。
$ gem build logstash-input-dynamodb.gemspec Successfully built RubyGem Name: logstash-input-dynamodb Version: 0.1.0 File: logstash-input-dynamodb-0.1.0.gem
出来上がったgemファイルをplugin
コマンドでインストールします。
$ sudo /opt/logstash/bin/plugin install --no-verify ./logstash-input-dynamodb-0.1.0.gem Installation successful
Elasticsearchのインストール
最新のElasticsearchのrpmファイルをゲットし、インストールします。
$ wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.noarch.rpm $ sudo yum install elasticsearch-1.7.1.noarch.rpm
そして起動。
$ sudo service elasticsearch start Starting elasticsearch: [ OK ]
これで準備は完了です。
やってみた
logstashをLogstash Plugin for Amazon DynamoDBを使う形で起動します。
$ logstash -e 'input { dynamodb{endpoint => "dynamodb.ap-northeast-1.amazonaws.com" streams_endpoint => "streams.dynamodb.ap-northeast-1.amazonaws.com" view_type => "new_and_old_images" aws_access_key_id => "<YOUR_ACCESS_KEY>" aws_secret_access_key => "<YOUR_SECRET_KEY>" table_name => "member"} } output { elasticsearch { host => localhost port => "9200" protocol => "http" index => "dynamodb" document_type => "member" } stdout { } }'
これでDynamoDBのmemberテーブルに入れたデータを、LogstashがDynamoDB streams経由で取得し、localhostにあるElasticsearchに投入します。ではDynamoDBのmemberテーブルにデータを投入してみます。
$ aws dynamodb put-item --table-name member --item '{"id":{"N":"1"},"Name":{"S":"sasaki"}}' $ aws dynamodb put-item --table-name member --item '{"id":{"N":"2"},"Name":{"S":"kaji"}}' $ aws dynamodb put-item --table-name member --item '{"id":{"N":"3"},"Name":{"S":"watanabe"}}'
Logstashでは以下のようなイベントログが出ます。
2015-08-18T01:45:58.369Z ip-172-31-14-208 {"eventID":"6eb45a35ecd59bd577e54258ed4ad0b6","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"1"}},"newImage":{"id":{"N":"1"},"Name":{"S":"sasaki"}},"sequenceNumber":"700000000000042797024","sizeBytes":18,"streamViewType":"NEW_AND_OLD_IMAGES"}} 2015-08-18T01:46:03.330Z ip-172-31-14-208 {"eventID":"02c0d03dfa5bc977c92111568b5fe552","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"2"}},"newImage":{"id":{"N":"2"},"Name":{"S":"kaji"}},"sequenceNumber":"800000000000042797107","sizeBytes":16,"streamViewType":"NEW_AND_OLD_IMAGES"}} 2015-08-18T01:46:08.330Z ip-172-31-14-208 {"eventID":"0bc3b5a26beaa0dacd48e08ec2e65e3a","eventName":"INSERT","eventVersion":"1.0","eventSource":"aws:dynamodb","awsRegion":"ap-northeast-1","dynamodb":{"keys":{"id":{"N":"3"}},"newImage":{"id":{"N":"3"},"Name":{"S":"watanabe"}},"sequenceNumber":"900000000000042797189","sizeBytes":20,"streamViewType":"NEW_AND_OLD_IMAGES"}}
Elasticsearchの状況を確認してみましょう。以下のように、"dynamodb"というindexが作成されています。
$ curl -XGET localhost:9200/_cat/indices?v health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open dynamodb 5 1 3 0 13.2kb 13.2kb
データを検索してみます。
$ curl -XGET localhost:9200/dynamodb/member/_search?pretty -d ' { "query" : { "match_all" : {} } }' { "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "dynamodb", "_type" : "member", "_id" : "AU8-fChSn0awDhzz_8Z5", "_score" : 1.0, "_source":{"message":"{\"eventID\":\"0bc3b5a26beaa0dacd48e08ec2e65e3a\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"3\"}},\"newImage\":{\"id\":{\"N\":\"3\"},\"Name\":{\"S\":\"watanabe\"}},\"sequenceNumber\":\"900000000000042797189\",\"sizeBytes\":20,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:46:08.330Z"} }, { "_index" : "dynamodb", "_type" : "member", "_id" : "AU8-fAJBn0awDhzz_8Z3", "_score" : 1.0, "_source":{"message":"{\"eventID\":\"6eb45a35ecd59bd577e54258ed4ad0b6\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"1\"}},\"newImage\":{\"id\":{\"N\":\"1\"},\"Name\":{\"S\":\"sasaki\"}},\"sequenceNumber\":\"700000000000042797024\",\"sizeBytes\":18,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:45:58.369Z"} }, { "_index" : "dynamodb", "_type" : "member", "_id" : "AU8-fBTIn0awDhzz_8Z4", "_score" : 1.0, "_source":{"message":"{\"eventID\":\"02c0d03dfa5bc977c92111568b5fe552\",\"eventName\":\"INSERT\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"2\"}},\"newImage\":{\"id\":{\"N\":\"2\"},\"Name\":{\"S\":\"kaji\"}},\"sequenceNumber\":\"800000000000042797107\",\"sizeBytes\":16,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T01:46:03.330Z"} } ] } }
ちゃんと3件ともデータが入っていますね!
ついでに、DynamoDBからデータの削除もしてみます。
$ aws dynamodb delete-item --table-name member --key '{"id":{"N":"1"}}'
すると以下のように、"eventName"が"REMOVE"となったデータが追加されます。
$ curl -XGET localhost:9200/dynamodb/member/_search?pretty -d ' { "query" : { "query_string": { "query": "REMOVE" } } }' { "took" : 17298, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 0.038356602, "hits" : [ { "_index" : "dynamodb", "_type" : "member", "_id" : "AU8-iQZTn0awDhzz_8aA", "_score" : 0.038356602, "_source":{"message":"{\"eventID\":\"9278444ed6808f079caf8573017a7b33\",\"eventName\":\"REMOVE\",\"eventVersion\":\"1.0\",\"eventSource\":\"aws:dynamodb\",\"awsRegion\":\"ap-northeast-1\",\"dynamodb\":{\"keys\":{\"id\":{\"N\":\"1\"}},\"oldImage\":{\"id\":{\"N\":\"1\"},\"Name\":{\"S\":\"sasaki\"}},\"sequenceNumber\":\"1000000000000042808296\",\"sizeBytes\":18,\"streamViewType\":\"NEW_AND_OLD_IMAGES\"}}","host":"ip-172-31-14-208","@version":"1","@timestamp":"2015-08-18T02:00:11.598Z"} } ] } }
さいごに
これまではアプリケーション側からDynamoDBとElasticsearchの両方にデータをPUTしていたものが、このプラグインを使うことで、DynamoDBだけにPUTすればよくなります。更に既にDynamoDBを使っていれば、アプリケーションの改修を行うことなく、Elasticsearchによる全文検索システムを追加構築することが出来るようになります。便利!