fluent-plugin-kinesisがAmazon Kinesis FirehoseとKPLに対応したので試してみた

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ども、大瀧です。
OSSのデータコレクタFluentdのKinesisプラグインであるfluent-plugin-kinesisのバージョン1.0リリースされ、従来からサポートするAmazon Kinesis Streamに加えて、Amazon Kinesis FirehoseKPL(Kinesis Producer Library)に対応しました!ためしてみた様子をレポートします!

セットアップ

動作確認環境

  • OS : Ubuntu Server 14.04 Trusty
  • Flunetd : td-agent 2.3.1

RubyGemsにデプロイ済みなので、td-agentに同梱されるtd-agent-gemで簡単にインストールできます。td-agentとセットでインストールしました。

$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   563  100   563    0     0   1882      0 --:--:-- --:--:-- --:--:--  1882
This script requires superuser access to install apt packages.
  :(略)
Starting td-agent:  * td-agent
Processing triggers for libc-bin (2.19-0ubuntu6.6) ...
$ sudo td-agent-gem install fluent-plugin-kinesis
Fetching: concurrent-ruby-1.0.1.gem (100%)
Successfully installed concurrent-ruby-1.0.1
Fetching: os-0.9.6.gem (100%)
Successfully installed os-0.9.6
Fetching: middleware-0.1.0.gem (100%)
Successfully installed middleware-0.1.0
Fetching: protobuf-3.6.7.gem (100%)
Successfully installed protobuf-3.6.7
Fetching: fluent-plugin-kinesis-1.0.0.gem (100%)
Successfully installed fluent-plugin-kinesis-1.0.0
Parsing documentation for concurrent-ruby-1.0.1
Installing ri documentation for concurrent-ruby-1.0.1
Parsing documentation for os-0.9.6
Installing ri documentation for os-0.9.6
Parsing documentation for middleware-0.1.0
Installing ri documentation for middleware-0.1.0
Parsing documentation for protobuf-3.6.7
Installing ri documentation for protobuf-3.6.7
Parsing documentation for fluent-plugin-kinesis-1.0.0
Installing ri documentation for fluent-plugin-kinesis-1.0.0
Done installing documentation for concurrent-ruby, os, middleware, protobuf, fluent-plugin-kinesis after 6 seconds
5 gems installed
$

以前のバージョン(`0.x`)から、Kinesis Streamを利用するタイプ名が変更(`kinesis`→`kinesis_streams`)されています。アップグレードする場合は注意しましょう。

試しに、Apacheログ(/var/log/apache2/access.log)をKinesis Firehoseのデリバリーストリームtakipone-testに送信するように設定してみました。takipone-testではS3のバケットtakipone-firehosetestに転送するよう、あらかじめ設定しています。いずれもオレゴン(us-west-2)リージョンに作成しました。

<source>
  @type tail
  path /var/log/apache2/access.log
  pos_file /var/log/td-agent/httpd-access.log.pos
  tag apache.access
  format apache2
</source>

<match **>
  @type kinesis_firehose
  region us-west-2
  delivery_stream_name takipone-test
</match>

flush_interval 1
buffer_chunk_limit 1m
try_flush_interval 0.1
queued_chunk_flush_interval 0.01
num_threads 15
detach_process 5

122〜127行目は、GitHubのREADMEにある推奨パラメータです。調節しつつ使いましょう。

併せて、いずれかの方法でAWSの認証情報(APIキーなど)を設定します。今回は、EC2で動作させるのでIAMロールにKinesisにアクセスする権限を与えて設定しました。GitHubのREADMEが参考になります。

では、td-agentを再起動して設定ファイルを読み込み、abでログファイルに書き込んでみます。

$ sudo service td-agent restart
Restarting td-agent:  * td-agent
$ ab -n 10000 http://localhost/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests
  : (略)
$

しばらく待つと、S3のバケットにファイルが作成されました!

fluentd-kinesis-plugin01

中身は、ApacheのログデータがJSON形式で格納されています。

{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}
  :(略)

KPL(Kinesis Producer Library)も試してみます。以下のようにtd-agent.confを変更しました。併せて、Kinesis Streamのストリームtakipone-testを作成して準備します。

<match **>
  @type kinesis_producer
  region us-west-2
  stream_name takipone-test
</match>

これでOKです。先ほどと同じく、abコマンドを何度か実行した後、ストリームからレコードを取り出してみます。空のレコードが数レコード続いてから、Fluentdから送ったレコードが出てきます。

$ aws kinesis get-shard-iterator --region us-west-2 --output json --stream-name takipone-test --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON
{
    "ShardIterator": "AAAAAAAAAAEsTU/Sld4XveRKzirhuga3xrC7K2BJZ5lT72OvvkJq5qvaYjkJvE7NOT4btXWTaM1D7cTjFEkavThzYTa4S2c2uEbZHPUCWx/Im5bw9bcP3oNPeeY5B9Y9MPGPH+P42Gptz65BupY4RlkDQ+HCmhSIhiaUTcxQGajfIvo9x2meIJhHGOMwkegEkl5W5RBavkP9yC2DCqnOYbPo4TuH2tbr"
}
$ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAEsTU/Sld4XveRKzirhuga3xrC7K2BJZ5lT72OvvkJq5qvaYjkJvE7NOT4btXWTaM1D7cTjFEkavThzYTa4S2c2uEbZHPUCWx/Im5bw9bcP3oNPeeY5B9Y9MPGPH+P42Gptz65BupY4RlkDQ+HCmhSIhiaUTcxQGajfIvo9x2meIJhHGOMwkegEkl5W5RBavkP9yC2DCqnOYbPo4TuH2tbr
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAEQopt7XsozsDxyBVPuDkZCZtv3vPiGip7fG1Yy9ZruraEgS+zLjfGupIqHU+xv7nsKoCZ4GfUfmpRs6IoaQy+FkDfxziLXqYkJ+ZA2MTQXYjQZmlXEpZAwQm/pRwe5jCMUKAEsyxu24gFbith3p8AnjaRDOWIF+Z2wagu6ENFp084qJAkr7+R/3NZ16i8wpNJbvA4G6HkJDhcxxmUZXiax",
    "MillisBehindLatest": 62926000
}
$ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAFlrcyI/mH5mU6nVlojqU6vqI/hFFJnoTDH1qMfP6Os5/iV3Mna+bWrWEgOe00djhGXr0cwDdlRDdg5dDsR/Mmjj/cVsAC2aGB6+JV11TMoQE0qvdGcZ+dfkIGKH3epEW5ZXgDjIb8YVXYKm9R/vHraz9jCHSly0WSUPs0pxvIVD6YjLMwS5xgx2AmmCTRF8IQPUAng1vaCNpmt6ItjGNbO
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAFlcnzQhBsDNI27oBJiHorl74FEBAw+5rkjD2LoMgHc/Nwqhz0NNpobQJwMGdBkST4HM2rLGLKOlZ+qMRzn9soWpwFpfEbyItCvm6eA9od8f5mXIrqBivfdxoeg8R4pxfAvIlo2IySo2LnaocizN6ECr62O2j4VxwTHh6Fy1KlaSKyrZexUv0KfmrrxxBk82JyKu+DM+++Mgvnn4cBwwKJ3",
    "MillisBehindLatest": 38149000
}
$ aws kinesis get-records --region us-west-2 --output json --shard-iterator AAAAAAAAAAGG2Kp28GH2eyxoBR7xYwXyOIf4ouPikGOrk1BCBxABK+1a5rxqGPYYY3/5exN7ql8+ODFaDFPBg589+Glenx20N1jE8fxyKIzjKWyIkV7apM7u/8yAdI1UrFGo4kD5MawqpFbf8IxgAwySce1qAe9zkpw3Rp4KbzGyFRa+/Jn3cjvjnJSLT+5eGtYDnJdq11KAXT7amGSip0VBjhcgtPk/
{
    "Records": [
        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjoyMDAsInNpemUiOjExNzY0LCJyZWZlcmVyIjpudWxsLCJhZ2VudCI6ImN1cmwvNy4zNS4wIn0=",
            "PartitionKey": "a11392987a071f957ab3505bfc25cace",
            "ApproximateArrivalTimestamp": 1458397403.342,
            "SequenceNumber": "49559948046087405716686625491085803253240564075691245570"
        },
        {
            "Data": "84mawgogNDJmZTg3OWMzOGQ2YmQ5YjM4ZTAyODk5Y2IwZDBhOTAKIDRlMzQ2NDFjYmEwZGFkOGZlZGFiMGNiN2Y3NzVlMjRiCiBmZDg0NGRkZTUyNWI3OTVlYjA3NGIyNmY4ZTlhMzRhOBp/CAAae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAEae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAIae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifWI6nYO/8w3iHbNp9OPICuY=",
            "PartitionKey": "a",
            "ApproximateArrivalTimestamp": 1458397523.517,
            "SequenceNumber": "49559948046087405716686625493830064863765780548611473410"
        },
        :(略)
    ],
    "NextShardIterator": "AAAAAAAAAAGwSuYZZGtgEmrq/oW3ln+uK47Crz2bX569pLpjCQkTc27Et8rTKjT3s8QmGXIon4iXlBP12Jx3e3uFtAmnUpTYoiIoGXLBngxFyQc/d8ZYQ54PE+AF/whhBKoy5FNE6uENHeP7uBMqjCYq2G3hH55ZT2TFjVOhKC9Q0nRiN5+Gn+5YCCZIIvM/2a7hF5Ec/pOxJ6anG1EqDZT9mml+Yj9O",
    "MillisBehindLatest": 6215000
}

KPLは、Protocol Buffers形式でレコードを集約するライブラリです。フォーマットは KPLのGitHubが参考になります。

Kinesis StreamsのレコードはBASE64エンコードされているので、デコードしてみると様子が確認できます。

$ echo '84mawgogNDJmZTg3OWMzOGQ2YmQ5YjM4ZTAyODk5Y2IwZDBhOTAKIDRlMzQ2NDFjYmEwZGFkOGZlZGFiMGNiN2Y3NzVlMjRiCiBmZDg0NGRkZTUyNWI3OTVlYjA3NGIyNmY4ZTlhMzRhOBp/CAAae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAEae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAIae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifWI6nYO/8w3iHbNp9OPICuY=' | base64 -D
��
 42fe879c38d6bd9b38e02899cb0d0a90
 4e34641cba0dad8fedab0cb7f775e24b
 fd844dde525b795eb074b26f8e9a34a8{{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}{{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"{{��i���:"127.0.0.1","user":null,"method":"GET","path":"/","code":200,"size":11783,"referer":null,"agent":"ApacheBench/2.3"}b:����
�$

複数のレコードが入っていますね! KPLで集約されたレコードは、同じくKPLで解除する必要があります。最近、Kinesis StreamからPullするKPL対応のLambdaのサンプルコードが公開されたので、手軽に使うのであればこれを利用するのが良いでしょう。

まとめ

FirehoseとKPLに対応し、大幅にパワーアップしたfluent-kinesis-pluginをご紹介しました。歴史のあるプラグインで、かつ日本人エンジニアがコミッタ *1のプラグインなので引き続き応援していきたいです!

個人的には、これまで有用とわかっていながらJavaしか実装が無く手が出せなかったKPLが、Lambdaのコードとセットで一気に身近になった印象です。KPLもバシバシ検討していきましょう!

脚注

  1. imaifactoryさんからriywoさんに引き継がれたようです