fluent-plugin-kinesisでKinesis Streamsにログを送信する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

EC2インスタンスからfluentdを使ってストリームデータの収集・処理基盤Kinesis Streamsにデータ送信する方法を紹介します。

Amazon Kinesis Streams とは

Amazon Kinesis Streams はデータレコードの大量のストリームをリアルタイムで収集し、処理する Amazonのマネージド・サービスです。

EC2インスタンスなどのプロデューサーは継続的にデータを Streams にプッシュし、EMR/Lambda/Kinesis アプリケーションといったコンシューマーがリアルタイムでデータを処理します。

kinesis-fluentd

本記事のゴール

EC2インスタンスから fluentd の Kinesis Stream へのアウトプットプラグイン fluent-plugin-kinesis を使って、Apacheサーバーのログを Kinesis Stream に送信します。

EMRや Lambda を使ったプロデューサーの実装は各ドキュメントを参照ください。

検証環境

  • OS : Amazon Linux 2015-09
  • fluentd : td-agent 0.12.20
  • fluent-plugin-kinesis : 0.4.0

EC2 に IAM Role の設定

サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": [
                "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*"
            ]
        }
    ]
}

ポリシーの詳細は次のドキュメントを確認お願いします。

http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html

Kinesis Stream の設定

Kinesis Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。

$ STREAM_NAME=test
$ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 1
$ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus`  becomes `ACTIVE`
$ aws kinesis describe-stream --stream-name $STREAM_NAME
{
    "StreamDescription": {
        "RetentionPeriodHours": 24,
        "StreamStatus": "ACTIVE",
        "StreamName": "test",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/test",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49559738018025937878454287328020612751047645945476939778"
                }
            }
        ]
    }
}

Apache のインストール

今回は Apache のログを Kinesis Stream に送信します。 Apache をインストールします。

$ sudo yum install -y httpd
$ sudo service httpd start
Starting httpd:                                            [  OK  ]

HTTP アクセスして、Apache が起動していることを確認しましょう。

$ curl localhost/

fluentd の設定

最後に fluentd の設定を行います。

fluentd のインストール

fluentd の安定版である td-agent をインストールします。

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
...
Installed:
  td-agent.x86_64 0:2.3.1-0.el2015

Complete!

fluentd の Kinesis Stream プラグインのインストール

Kinesis Stream プラグインとして fluent-plugin-kinesis をインストールします。

$ sudo td-agent-gem install fluent-plugin-kinesis
Fetching: fluent-plugin-kinesis-0.4.0.gem (100%)
Successfully installed fluent-plugin-kinesis-0.4.0
Parsing documentation for fluent-plugin-kinesis-0.4.0
Installing ri documentation for fluent-plugin-kinesis-0.4.0
Done installing documentation for fluent-plugin-kinesis after 0 seconds
1 gem installed

fluentd設定ファイルの修正

/etc/td-agent/td-agent.conf にある設定ファイルを修正します。

アクセスログファイル/var/log/httpd/access_logtailし、書き込まれたデータを Kinesis Stream に送信します。

データソースの定義が <source> タグ、データ処理の定義が <match> タグです。

$ cat /etc/td-agent/td-agent.conf
<source>
  type tail
  format apache2
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd-access.pos
  tag log.httpd.access
</source>

<match log.httpd.*>
  type kinesis
  stream_name test
  region ap-northeast-1
  random_partition_key true
  use_yajl true
</match>

設定の肝は2箇所あります。

Kinesis Streamが利用するpartition key

Kinesis Stream にデータ送信するときの partition key です。

Kinesis Stream はシャードごとに処理能力が決まっており、シャード数を増やすことでスケールアウトします。Kinesis Stream が受け取ったデータは、基本的に partition key のMD5ハッシュ値を元にシャード分散されます。 そのため、partition key が一様でないと、データがルーティングされるシャードが偏ってしまい、Kinesis Stream 全体としてのスループットが上がりません。

fluent-plugin-kinesis は partition key をカスタマイズできますが、 特異な要件が存在しないかぎり、random_partition_key trueとして偏りのない parition key が生成されるようにしましょう。内部的には UUID V4 が使われています

use_yajl

入力データにマルチバイト文字が含まれる場合、to_json の JSON へのフォーマット変換時にEncoding::UndefinedConversionErrorエラーが発生します。 マルチバイトをKinesis Streamsに送信するときはこのオプションを有効にしましょう。

use_yajl

Boolean, default is false. In case you find error Encoding::UndefinedConversionError with multibyte texts, you can avoid that error with this option. https://github.com/awslabs/aws-fluent-plugin-kinesis#use_yajl

設定ファイルのシンタックスチェック

設定ファイルのシンタックスチェックをしましょう。

$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
2016-03-03 14:51:29 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-03 14:51:29 +0000 [info]: starting fluentd-0.12.20 as dry run mode
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-03 14:51:29 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-03 14:51:29 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-03-03 14:51:29 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis"
2016-03-03 14:51:29 +0000 [info]: adding source type="tail"
2016-03-03 14:51:29 +0000 [info]: using configuration file: <ROOT>
  <source>
    type tail
    format apache2
    path /var/log/httpd/access_log
    pos_file /var/log/td-agent/httpd-access.pos
    tag log.httpd.access
  </source>
  <match log.httpd.*>
    type kinesis
    stream_name test
    region ap-northeast-1
    random_partition_key true
    use_yajl true
  </match>
</ROOT>
$ echo $?
0

エラーメッセージが表示されず、ステータスコードも正常の「0」です。

fluentd の起動

fluentd をフォアグラウンド実行してみましょう。

$ sudo td-agent -c /etc/td-agent/td-agent.conf
2016-03-03 14:51:45 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-03 14:51:45 +0000 [info]: starting fluentd-0.12.20
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-kinesis' version '0.4.0'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-03 14:51:45 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-03 14:51:45 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-03-03 14:51:45 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis"
2016-03-03 14:51:45 +0000 [info]: adding source type="tail"
2016-03-03 14:51:45 +0000 [info]: using configuration file: <ROOT>
  <source>
    type tail
    format apache2
    path /var/log/httpd/access_log
    pos_file /var/log/td-agent/httpd-access.pos
    tag log.httpd.access
  </source>
  <match log.httpd.*>
    type kinesis
    stream_name test
    region ap-northeast-1
    random_partition_key true
  </match>
</ROOT>
2016-03-03 14:51:45 +0000 [info]: following tail of /var/log/httpd/access_log

Apache にリクエストをして、Kinesis Stream に送信するデータを生成します。

$ curl localhost

しばらく動かしてみて、問題なさそうなら、 fluentd をバックグラウンド実行しましょう。

$ sudo service td-agent start
Starting td-agent:                       [  OK  ]

Kinesis Stream のデータ確認

最後に、Kinesis Stream に送信したデータを確認します。Kinesis Stream からストリームデータを取得するには

  1. シャードに対して、ストリームの位置を要求(get-shard-iterator)
  2. シャードの特定の位置以降のデータを要求(get-shards)

の2段階のプロセスが必要です。

実際にやってみましょう。

$ STREAM_NAME=test
$ aws kinesis get-shard-iterator \
  --shard-id shardId-000000000000 \
  --stream-name $STREAM_NAME \
  --shard-iterator-type TRIM_HORIZON
{
    "ShardIterator": "AAAAAAAAAAHgI6Tsl..."
}
$ aws kinesis get-records --shard-iterator "AAAAAAAAAAHgI6Tsl..."
{
    "Records": [
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
            "PartitionKey": "129d3dc4-c193-4b13-9ab7-7153a4a1893d",
            "ApproximateArrivalTimestamp": 1457016681.798,
            "SequenceNumber": "49559738018025937878454287328199533772350652157580541954"
        },
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MjoyN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
            "PartitionKey": "62976053-2211-40f1-ac4e-0242c240b0fe",
            "ApproximateArrivalTimestamp": 1457016765.424,
            "SequenceNumber": "49559738018025937878454287328203160549809501817540706306"
        },
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii9mb28iLCJjb2RlIjo0MDQsInNpemUiOjI3NiwicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJjdXJsLzcuNDAuMCIsInRpbWUiOiIyMDE2LTAzLTAzVDE0OjUyOjQ4WiIsInRhZyI6ImxvZy5odHRwZC5hY2Nlc3MifQ==",
            "PartitionKey": "52fa118b-6d37-483b-b36a-7d70578b5661",
            "ApproximateArrivalTimestamp": 1457016825.286,
            "SequenceNumber": "49559738018025937878454287328204369475629120569884016642"
        },
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzowN1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
            "PartitionKey": "4ddbdf0e-af2f-4a62-85af-80c8cf243725",
            "ApproximateArrivalTimestamp": 1457016825.292,
            "SequenceNumber": "49559738018025937878454287328205578401448735199058722818"
        },
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiUE9TVCIsInBhdGgiOiIvIiwiY29kZSI6NDA0LCJzaXplIjoyNzMsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MzoyM1oiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0=",
            "PartitionKey": "59792b53-dd87-4908-b81e-37d45fd3ed89",
            "ApproximateArrivalTimestamp": 1457016825.292,
            "SequenceNumber": "49559738018025937878454287328206787327268349828233428994"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAGNLrvu6z30rF6F3Fd2s/F1DNfET2t+awpzUpgrOzmcziRoHyoYW1PcoapF1oTZHtMZhKxWS7GS53L+Aer8nj9fJB33odOa61WNlLh4HKXS22/G/E13ltpTFuan7kbE0ftfovlk+rSrzDbhC8lJQaPjw2HD0B7O0rpYDiXF3awoA3H1RsuN1GwiajEk2qTa2rXHaDJ9kdF8pogzAGbLmu9C",
    "MillisBehindLatest": 0
}

実際のデータは base64 エンコードされているので、デコードして中身を確認します。

$ DATA="eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM4MzksInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjQwLjAiLCJ0aW1lIjoiMjAxNi0wMy0wM1QxNDo1MDo1NloiLCJ0YWciOiJsb2cuaHR0cGQuYWNjZXNzIn0="
$ echo -n $DATA | base64 -d
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":403,"size":3839,"referer":null,"agent":"curl/7.40.0","time":"2016-03-03T14:50:56Z","tag":"log.httpd.access"}

確かにApacheログですね。

まとめ

今回は fluentd から Kinesis Stream にデータ送信する方法を紹介しました。 ログを集約したいという要求に対して、すべてを自前で構築するのは多くの困難が伴います。

可用性もデータ耐久性を兼ね備えたログ集約サーバーとしてAWSフルマネージドな Kinesis Stream を採用し、ログデータ送信には fluentd を採用すると、簡単にログ集約システムを構築できます。

Kinesis には、今回紹介した Kinesis Stream の他にも、S3/Redshift 連携を自動でやってくれる Kinesis Firehose も存在します。fluentdとKinesis Firehoseの連携は次の記事を参照ください。

fluent-plugin-kinesis-firehoseでAmazon Kinesis Firehoseにログを転送する | Developers.IO

参考リンク

  • https://aws.amazon.com/kinesis/
  • https://docs.treasuredata.com/articles/td-agent
  • https://github.com/awslabs/aws-fluent-plugin-kinesis