この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
EC2インスタンスからfluentdを使ってストリームデータの収集・処理基盤Kinesis Streamsにデータ送信する方法を紹介します。
Amazon Kinesis Streams とは
Amazon Kinesis Streams はデータレコードの大量のストリームをリアルタイムで収集し、処理する Amazonのマネージド・サービスです。
EC2インスタンスなどのプロデューサーは継続的にデータを Streams にプッシュし、EMR/Lambda/Kinesis アプリケーションといったコンシューマーがリアルタイムでデータを処理します。
本記事のゴール
EC2インスタンスから fluentd の Kinesis Stream へのアウトプットプラグイン fluent-plugin-kinesis を使って、Apacheサーバーのログを Kinesis Stream に送信します。
EMRや Lambda を使ったプロデューサーの実装は各ドキュメントを参照ください。
検証環境
- OS : Amazon Linux 2015-09
- fluentd : td-agent 0.12.20
- fluent-plugin-kinesis : 0.4.0
EC2 に IAM Role の設定
サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": [
"arn:aws:kinesis:ap-northeast-1:123456789012:stream/*"
]
}
]
}
ポリシーの詳細は次のドキュメントを確認お願いします。
http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html
Kinesis Stream の設定
Kinesis Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。
$ STREAM_NAME=test
$ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1
$ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus` becomes `ACTIVE`
$ aws kinesis describe-stream --stream-name $STREAM_NAME
{
"StreamDescription": {
"RetentionPeriodHours": 24,
"StreamStatus": "ACTIVE",
"StreamName": "test",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/test",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49559738018025937878454287328020612751047645945476939778"
}
}
]
}
}
Apache のインストール
今回は Apache のログを Kinesis Stream に送信します。 Apache をインストールします。
$ sudo yum install -y httpd
$ sudo service httpd start
Starting httpd: [ OK ]
HTTP アクセスして、Apache が起動していることを確認しましょう。
$ curl localhost/
fluentd の設定
最後に fluentd の設定を行います。
fluentd のインストール
fluentd の安定版である td-agent をインストールします。
$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
...
Installed:
td-agent.x86_64 0:2.3.1-0.el2015
Complete!
fluentd の Kinesis Stream プラグインのインストール
Kinesis Stream プラグインとして fluent-plugin-kinesis をインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis
Fetching: fluent-plugin-kinesis-0.4.0.gem (100%)
Successfully installed fluent-plugin-kinesis-0.4.0
Parsing documentation for fluent-plugin-kinesis-0.4.0
Installing ri documentation for fluent-plugin-kinesis-0.4.0
Done installing documentation for fluent-plugin-kinesis after 0 seconds
1 gem installed
fluentd設定ファイルの修正
/etc/td-agent/td-agent.conf
にある設定ファイルを修正します。
アクセスログファイル/var/log/httpd/access_log
を tail
し、書き込まれたデータを Kinesis Stream に送信します。
データソースの定義が <source>
タグ、データ処理の定義が <match>
タグです。
$ cat /etc/td-agent/td-agent.conf
<source>
type tail
format apache2
path /var/log/httpd/access_log
pos_file /var/log/td-agent/httpd-access.pos
tag log.httpd.access
</source>
<match log.httpd.*>
type kinesis
stream_name test
region ap-northeast-1
random_partition_key true
use_yajl true
</match>
設定の肝は2箇所あります。
Kinesis Streamが利用するpartition key
Kinesis Stream にデータ送信するときの partition key です。
Kinesis Stream はシャードごとに処理能力が決まっており、シャード数を増やすことでスケールアウトします。Kinesis Stream が受け取ったデータは、基本的に partition key のMD5ハッシュ値を元にシャード分散されます。 そのため、partition key が一様でないと、データがルーティングされるシャードが偏ってしまい、Kinesis Stream 全体としてのスループットが上がりません。
fluent-plugin-kinesis は partition key をカスタマイズできますが、 特異な要件が存在しないかぎり、random_partition_key true
として偏りのない parition key が生成されるようにしましょう。内部的には UUID V4 が使われています。
use_yajl
入力データにマルチバイト文字が含まれる場合、to_json
の JSON へのフォーマット変換時にEncoding::UndefinedConversionError
エラーが発生します。
マルチバイトをKinesis Streamsに送信するときはこのオプションを有効にしましょう。
use_yajl
Boolean, default is false. In case you find error Encoding::UndefinedConversionError with multibyte texts, you can avoid that error with this option. https://github.com/awslabs/aws-fluent-plugin-kinesis#use_yajl
設定ファイルのシンタックスチェック
設定ファイルのシンタックスチェックをしましょう。
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
2016-03-03 14:51:29 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-03 14:51:29 +0000 [info]: starting fluentd-0.12.20 as dry run mode
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-03 14:51:29 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-03-03 14:51:29 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis"
2016-03-03 14:51:29 +0000 [info]: adding source type="tail"
2016-03-03 14:51:29 +0000 [info]: using configuration file: <ROOT>
<source>
type tail
format apache2
path /var/log/httpd/access_log
pos_file /var/log/td-agent/httpd-access.pos
tag log.httpd.access
</source>
<match log.httpd.*>
type kinesis
stream_name test
region ap-northeast-1
random_partition_key true
use_yajl true
</match>
</ROOT>
$ echo $?
0
エラーメッセージが表示されず、ステータスコードも正常の「0」です。
fluentd の起動
fluentd をフォアグラウンド実行してみましょう。
$ sudo td-agent -c /etc/td-agent/td-agent.conf
2016-03-03 14:51:45 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-03 14:51:45 +0000 [info]: starting fluentd-0.12.20
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-03 14:51:45 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-03-03 14:51:45 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis"
2016-03-03 14:51:45 +0000 [info]: adding source type="tail"
2016-03-03 14:51:45 +0000 [info]: using configuration file: <ROOT>
<source>
type tail
format apache2
path /var/log/httpd/access_log
pos_file /var/log/td-agent/httpd-access.pos
tag log.httpd.access
</source>
<match log.httpd.*>
type kinesis
stream_name test
region ap-northeast-1
random_partition_key true
</match>
</ROOT>
2016-03-03 14:51:45 +0000 [info]: following tail of /var/log/httpd/access_log
Apache にリクエストをして、Kinesis Stream に送信するデータを生成します。
$ curl localhost
しばらく動かしてみて、問題なさそうなら、 fluentd をバックグラウンド実行しましょう。
$ sudo service td-agent start
Starting td-agent: [ OK ]
Kinesis Stream のデータ確認
最後に、Kinesis Stream に送信したデータを確認します。Kinesis Stream からストリームデータを取得するには
- シャードに対して、ストリームの位置を要求(
get-shard-iterator
) - シャードの特定の位置以降のデータを要求(
get-shards
)
の2段階のプロセスが必要です。
実際にやってみましょう。
$ STREAM_NAME=test
$ aws kinesis get-shard-iterator \
--shard-id shardId-000000000000 \
--stream-name $STREAM_NAME \
--shard-iterator-type TRIM_HORIZON
{
"ShardIterator": "AAAAAAAAAAHgI6Tsl..."
}
$ aws kinesis get-records --shard-iterator "AAAAAAAAAAHgI6Tsl..."
{
"Records": [
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
"PartitionKey": "129d3dc4-c193-4b13-9ab7-7153a4a1893d",
"ApproximateArrivalTimestamp": 1457016681.798,
"SequenceNumber": "49559738018025937878454287328199533772350652157580541954"
},
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MjoyN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
"PartitionKey": "62976053-2211-40f1-ac4e-0242c240b0fe",
"ApproximateArrivalTimestamp": 1457016765.424,
"SequenceNumber": "49559738018025937878454287328203160549809501817540706306"
},
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii9mb28iLCJjb2RlIjo0MDQsInNpemUiOjI3NiwicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJjdXJsLzcuNDAuMCIsInRpbWUiOiIyMDE2LTAzLTAzVDE0OjUyOjQ4WiIsInRhZyI6ImxvZy5odHRwZC5hY2Nlc3MifQ==",
"PartitionKey": "52fa118b-6d37-483b-b36a-7d70578b5661",
"ApproximateArrivalTimestamp": 1457016825.286,
"SequenceNumber": "49559738018025937878454287328204369475629120569884016642"
},
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzowN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
"PartitionKey": "4ddbdf0e-af2f-4a62-85af-80c8cf243725",
"ApproximateArrivalTimestamp": 1457016825.292,
"SequenceNumber": "49559738018025937878454287328205578401448735199058722818"
},
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiUE9TVCIsInBhdGgiOiIvIiwiY29kZSI6NDA0LCJzaXplIjoyNzMsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzoyM1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
"PartitionKey": "59792b53-dd87-4908-b81e-37d45fd3ed89",
"ApproximateArrivalTimestamp": 1457016825.292,
"SequenceNumber": "49559738018025937878454287328206787327268349828233428994"
}
],
"NextShardIterator": "AAAAAAAAAAGNLrvu6z30rF6F3Fd2s/F1DNfET2t+awpzUpgrOzmcziRoHyoYW1PcoapF1oTZHtMZhKxWS7GS53L+Aer8nj9fJB33odOa61WNlLh4HKXS22/G/E13ltpTFuan7kbE0ftfovlk+rSrzDbhC8lJQaPjw2HD0B7O0rpYDiXF3awoA3H1RsuN1GwiajEk2qTa2rXHaDJ9kdF8pogzAGbLmu9C",
"MillisBehindLatest": 0
}
実際のデータは base64 エンコードされているので、デコードして中身を確認します。
$ DATA="eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0="
$ echo -n $DATA | base64 -d
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":403,"size":3839,"referer":null,"agent":"curl/7.40.0","time":"2016-03-03T14:50:56Z","tag":"log.httpd.access"}
確かにApacheログですね。
まとめ
今回は fluentd から Kinesis Stream にデータ送信する方法を紹介しました。 ログを集約したいという要求に対して、すべてを自前で構築するのは多くの困難が伴います。
可用性もデータ耐久性を兼ね備えたログ集約サーバーとしてAWSフルマネージドな Kinesis Stream を採用し、ログデータ送信には fluentd を採用すると、簡単にログ集約システムを構築できます。
Kinesis には、今回紹介した Kinesis Stream の他にも、S3/Redshift 連携を自動でやってくれる Kinesis Firehose も存在します。fluentdとKinesis Firehoseの連携は次の記事を参照ください。
fluent-plugin-kinesis-firehoseでAmazon Kinesis Firehoseにログを転送する | Developers.IO
参考リンク
- https://aws.amazon.com/kinesis/
- https://docs.treasuredata.com/articles/td-agent
- https://github.com/awslabs/aws-fluent-plugin-kinesis