【新機能】Amazon Kinesis Firehoseの出力先にAmazon Elasticsearch Serviceが追加されました
はじめに
藤本です。
本日、AWS Summit Chicagoが開催され、いくつかのサービスのGA、新機能が発表されました。
本エントリではKinesis Firehoseの新しい出力先として追加されたAmazon Elasticsarch Serviceとの連携を触ってみましたのでご紹介します。
概要
Amazon Kinsis Firehoseは今まで出力先にS3、Redshiftと対応していましたが、今回の発表でAmazon Elasticsearch Serviceが対応しました。
- 今まで
- S3
- Redshift
- 追加
- Amazon ES
やってみた
Amazon Elasticsearch Serviceドメイン作成
Amazon Elasticsearch Serviceのドメイン作成は[新機能]Amazon Elasticsearch Serviceがリリースされました!を参照。
Amazon Kinesis FirehoseのDelivery Streams作成
※ Amazon Kinesis Firehoseは現在バージニア、オレゴン、アイルランドのリージョンのみリリースされています。(東京リージョンのリリースが待ち遠しい)
作成ウィザード
Kinesis Firehoseの画面からCreate Delivery Systemをクリックします。
Destinationに「Amazon Elasticsearch Service」が追加されています!
出力先設定
出力先の情報を指定します。
簡単に設定項目の内容をご説明します。
- Delivery stream name : この定義の名前
- Elasticsearch domain : 出力先のESドメイン(同一アカウントに作成したElasticsearch Serviceドメインがプルダウンで表示される)
- Index : Elasticsearchのインデックス名プレフィックス(下のIndex rotationが付与された日付が実際のIndexとなる)必要に応じて事前にIndex Templateを作成しておくと良いでしょう。
- Index rotation : インデックスのローテーションルール(毎時、日次、週次、月次、ローテーションなし)
- Type : Indexのタイプ名
- Retry duration : 出力のリトライ間隔
- Backup mode : Amazon ESに出力すると同時にS3に出力します。失敗したデータのみか、全てのデータか指定可能。失敗したデータのみにしておき、後でS3から取り出して、リトライすることでデータの損失が防げるのは嬉しいです。
- S3 bucket : 出力先のS3バケット名
- S3 prefix : バケット配下のパスのプレフィックス
詳細設定
詳細情報を設定します。
こちらも簡単に設定項目の内容をご説明します。
- Buffer size : 出力する閾値となるサイズ
- Buffer interval : 出力する閾値となる時間
- Data compression : S3に配置するデータの圧縮要否、圧縮形式
- Data encryption : S3に配置するデータの暗号化要否
- Error logging : Amazon ES、S3エラー時のログ
- IAM Role : 割り当てるIAM Role。S3、AmazonES、CloudWatch Logsへのアクセス権限が必要となります。プルダウンから必要な権限を持つIAM Roleを作成してくれます。
圧縮の選択肢
暗号化の選択肢
設定確認
確認画面で設定内容を確認します。
一覧に作成した連携情報が表示されます。
動作確認
データ送信
データを送信します。
# python Python 2.7.10 (default, Jul 19 2015, 22:08:23) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import boto3 >>> fh = boto3.client('firehose', region_name='us-east-1') >>> fh.put_record(DeliveryStreamName='integration-elasticsearch', Record={"Data": '{"message": "firehose put record"}'}) {u'RecordId': u'BblOWOueryOWTZyi+ngmRd4FVNpjvVCBCou7uxC1XrKS+IEh8HIc6fks3VTAx4/gvKVFZWjhXpU05uMZarwwPUkDOLrFDYAndR7T8mT0zfZM46i9ae32YNAdxdYmFd8UHZsgI9lXSRUzG4adeipqhW+0OH6QXpNqVx7kyXSioUbbwO+JWOpajSxPiNEr/THPBD+1VV/62hL69MR9nABV72MbzD2MOC7U', 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'a7b80611-06ae-11e6-af85-5b954446c476'}}
Firehoseでの確認
MonitoringからFirehoseで受信したデータ、S3へ出力したデータ、AmazonESへ出力したデータをグラフ化されます。
下のグラフでFirehoseがデータを受信したこと、AmazonESにデータを出力したことが分かります。
AmazonESでの確認
AmazonESのドメインでインデックスの情報を確認します。
Index名、Type名が設定されて文字列となっています。Countも増えています。
Kibanaでのデータ確認
Kibanaへ接続し、Indexを登録します。
Discoveryから登録したデータを確認します。
JSONで登録したデータからmessageがフィールドとして認識されています。
あ、タイムスタンプは入らないのかー、タイムスタンプ入らんのかー、Kinesis側でタイムスタンプ入れてほしいなぁ。
まとめ
いかがでしたでしょうか?
非常に簡単にセットアップできました。Kinesisというスケールしてくれる大きな受け皿で送信する側はあまり負荷を気にせず送ることができ、Amazon ESの負荷に関しても失敗したものはS3にバックアップできるので、負荷が低い時に改めてAWS LambdaなどでS3からファイルを取得して、Amazon ESに流し込めば、データの損失も可能性も低くすることができます。スゴく嬉しい。
こちらはスゴく待ち望んだ機能でした。
今まではKinesis Streamから取り出す処理を実装したり、Firehoseで一度S3に出力してから、AWS Lambdaで取得する実装をしていました。