fluent-plugin-kinesisでKinesis Streamsにログを送信する
EC2インスタンスからfluentdを使ってストリームデータの収集・処理基盤Kinesis Streamsにデータ送信する方法を紹介します。
Amazon Kinesis Streams とは
Amazon Kinesis Streams はデータレコードの大量のストリームをリアルタイムで収集し、処理する Amazonのマネージド・サービスです。
EC2インスタンスなどのプロデューサーは継続的にデータを Streams にプッシュし、EMR/Lambda/Kinesis アプリケーションといったコンシューマーがリアルタイムでデータを処理します。
本記事のゴール
EC2インスタンスから fluentd の Kinesis Stream へのアウトプットプラグイン fluent-plugin-kinesis を使って、Apacheサーバーのログを Kinesis Stream に送信します。
EMRや Lambda を使ったプロデューサーの実装は各ドキュメントを参照ください。
検証環境
- OS : Amazon Linux 2015-09
- fluentd : td-agent 0.12.20
- fluent-plugin-kinesis : 0.4.0
EC2 に IAM Role の設定
サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "kinesis:*", "Resource": [ "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*" ] } ] }
ポリシーの詳細は次のドキュメントを確認お願いします。
http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html
Kinesis Stream の設定
Kinesis Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。
$ STREAM_NAME=test $ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1 $ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus` becomes `ACTIVE` $ aws kinesis describe-stream --stream-name $STREAM_NAME { "StreamDescription": { "RetentionPeriodHours": 24, "StreamStatus": "ACTIVE", "StreamName": "test", "StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/test", "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559738018025937878454287328020612751047645945476939778" } } ] } }
Apache のインストール
今回は Apache のログを Kinesis Stream に送信します。 Apache をインストールします。
$ sudo yum install -y httpd $ sudo service httpd start Starting httpd: [ OK ]
HTTP アクセスして、Apache が起動していることを確認しましょう。
$ curl localhost/
fluentd の設定
最後に fluentd の設定を行います。
fluentd のインストール
fluentd の安定版である td-agent をインストールします。
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh ... Installed: td-agent.x86_64 0:2.3.1-0.el2015 Complete!
fluentd の Kinesis Stream プラグインのインストール
Kinesis Stream プラグインとして fluent-plugin-kinesis をインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis Fetching: fluent-plugin-kinesis-0.4.0.gem (100%) Successfully installed fluent-plugin-kinesis-0.4.0 Parsing documentation for fluent-plugin-kinesis-0.4.0 Installing ri documentation for fluent-plugin-kinesis-0.4.0 Done installing documentation for fluent-plugin-kinesis after 0 seconds 1 gem installed
fluentd設定ファイルの修正
/etc/td-agent/td-agent.conf
にある設定ファイルを修正します。
アクセスログファイル/var/log/httpd/access_log
を tail
し、書き込まれたデータを Kinesis Stream に送信します。
データソースの定義が <source>
タグ、データ処理の定義が <match>
タグです。
$ cat /etc/td-agent/td-agent.conf <source> type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> type kinesis stream_name test region ap-northeast-1 random_partition_key true use_yajl true </match>
設定の肝は2箇所あります。
Kinesis Streamが利用するpartition key
Kinesis Stream にデータ送信するときの partition key です。
Kinesis Stream はシャードごとに処理能力が決まっており、シャード数を増やすことでスケールアウトします。Kinesis Stream が受け取ったデータは、基本的に partition key のMD5ハッシュ値を元にシャード分散されます。 そのため、partition key が一様でないと、データがルーティングされるシャードが偏ってしまい、Kinesis Stream 全体としてのスループットが上がりません。
fluent-plugin-kinesis は partition key をカスタマイズできますが、 特異な要件が存在しないかぎり、random_partition_key true
として偏りのない parition key が生成されるようにしましょう。内部的には UUID V4 が使われています。
use_yajl
入力データにマルチバイト文字が含まれる場合、to_json
の JSON へのフォーマット変換時にEncoding::UndefinedConversionError
エラーが発生します。
マルチバイトをKinesis Streamsに送信するときはこのオプションを有効にしましょう。
use_yajl
Boolean, default is false. In case you find error Encoding::UndefinedConversionError with multibyte texts, you can avoid that error with this option. https://github.com/awslabs/aws-fluent-plugin-kinesis#use_yajl
設定ファイルのシンタックスチェック
設定ファイルのシンタックスチェックをしましょう。
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf 2016-03-03 14:51:29 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf" 2016-03-03 14:51:29 +0000 [info]: starting fluentd-0.12.20 as dry run mode 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1' 2016-03-03 14:51:29 +0000 [info]: gem 'fluentd' version '0.12.20' 2016-03-03 14:51:29 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis" 2016-03-03 14:51:29 +0000 [info]: adding source type="tail" 2016-03-03 14:51:29 +0000 [info]: using configuration file: <ROOT> <source> type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> type kinesis stream_name test region ap-northeast-1 random_partition_key true use_yajl true </match> </ROOT> $ echo $? 0
エラーメッセージが表示されず、ステータスコードも正常の「0」です。
fluentd の起動
fluentd をフォアグラウンド実行してみましょう。
$ sudo td-agent -c /etc/td-agent/td-agent.conf 2016-03-03 14:51:45 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf" 2016-03-03 14:51:45 +0000 [info]: starting fluentd-0.12.20 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2' 2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1' 2016-03-03 14:51:45 +0000 [info]: gem 'fluentd' version '0.12.20' 2016-03-03 14:51:45 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis" 2016-03-03 14:51:45 +0000 [info]: adding source type="tail" 2016-03-03 14:51:45 +0000 [info]: using configuration file: <ROOT> <source> type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> type kinesis stream_name test region ap-northeast-1 random_partition_key true </match> </ROOT> 2016-03-03 14:51:45 +0000 [info]: following tail of /var/log/httpd/access_log
Apache にリクエストをして、Kinesis Stream に送信するデータを生成します。
$ curl localhost
しばらく動かしてみて、問題なさそうなら、 fluentd をバックグラウンド実行しましょう。
$ sudo service td-agent start Starting td-agent: [ OK ]
Kinesis Stream のデータ確認
最後に、Kinesis Stream に送信したデータを確認します。Kinesis Stream からストリームデータを取得するには
- シャードに対して、ストリームの位置を要求(
get-shard-iterator
) - シャードの特定の位置以降のデータを要求(
get-shards
)
の2段階のプロセスが必要です。
実際にやってみましょう。
$ STREAM_NAME=test $ aws kinesis get-shard-iterator \ --shard-id shardId-000000000000 \ --stream-name $STREAM_NAME \ --shard-iterator-type TRIM_HORIZON { "ShardIterator": "AAAAAAAAAAHgI6Tsl..." } $ aws kinesis get-records --shard-iterator "AAAAAAAAAAHgI6Tsl..." { "Records": [ { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=", "PartitionKey": "129d3dc4-c193-4b13-9ab7-7153a4a1893d", "ApproximateArrivalTimestamp": 1457016681.798, "SequenceNumber": "49559738018025937878454287328199533772350652157580541954" }, { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MjoyN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=", "PartitionKey": "62976053-2211-40f1-ac4e-0242c240b0fe", "ApproximateArrivalTimestamp": 1457016765.424, "SequenceNumber": "49559738018025937878454287328203160549809501817540706306" }, { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii9mb28iLCJjb2RlIjo0MDQsInNpemUiOjI3NiwicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJjdXJsLzcuNDAuMCIsInRpbWUiOiIyMDE2LTAzLTAzVDE0OjUyOjQ4WiIsInRhZyI6ImxvZy5odHRwZC5hY2Nlc3MifQ==", "PartitionKey": "52fa118b-6d37-483b-b36a-7d70578b5661", "ApproximateArrivalTimestamp": 1457016825.286, "SequenceNumber": "49559738018025937878454287328204369475629120569884016642" }, { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzowN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=", "PartitionKey": "4ddbdf0e-af2f-4a62-85af-80c8cf243725", "ApproximateArrivalTimestamp": 1457016825.292, "SequenceNumber": "49559738018025937878454287328205578401448735199058722818" }, { "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiUE9TVCIsInBhdGgiOiIvIiwiY29kZSI6NDA0LCJzaXplIjoyNzMsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzoyM1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=", "PartitionKey": "59792b53-dd87-4908-b81e-37d45fd3ed89", "ApproximateArrivalTimestamp": 1457016825.292, "SequenceNumber": "49559738018025937878454287328206787327268349828233428994" } ], "NextShardIterator": "AAAAAAAAAAGNLrvu6z30rF6F3Fd2s/F1DNfET2t+awpzUpgrOzmcziRoHyoYW1PcoapF1oTZHtMZhKxWS7GS53L+Aer8nj9fJB33odOa61WNlLh4HKXS22/G/E13ltpTFuan7kbE0ftfovlk+rSrzDbhC8lJQaPjw2HD0B7O0rpYDiXF3awoA3H1RsuN1GwiajEk2qTa2rXHaDJ9kdF8pogzAGbLmu9C", "MillisBehindLatest": 0 }
実際のデータは base64 エンコードされているので、デコードして中身を確認します。
$ DATA="eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=" $ echo -n $DATA | base64 -d {"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":403,"size":3839,"referer":null,"agent":"curl/7.40.0","time":"2016-03-03T14:50:56Z","tag":"log.httpd.access"}
確かにApacheログですね。
まとめ
今回は fluentd から Kinesis Stream にデータ送信する方法を紹介しました。 ログを集約したいという要求に対して、すべてを自前で構築するのは多くの困難が伴います。
可用性もデータ耐久性を兼ね備えたログ集約サーバーとしてAWSフルマネージドな Kinesis Stream を採用し、ログデータ送信には fluentd を採用すると、簡単にログ集約システムを構築できます。
Kinesis には、今回紹介した Kinesis Stream の他にも、S3/Redshift 連携を自動でやってくれる Kinesis Firehose も存在します。fluentdとKinesis Firehoseの連携は次の記事を参照ください。
fluent-plugin-kinesis-firehoseでAmazon Kinesis Firehoseにログを転送する | Developers.IO
参考リンク
- https://aws.amazon.com/kinesis/
- https://docs.treasuredata.com/articles/td-agent
- https://github.com/awslabs/aws-fluent-plugin-kinesis