Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる

2016.06.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

2016/06/03(金)のAWS Summit Tokyo 2016でクックパッド株式会社の星さんが「秒間数万のログをいい感じにするアーキテクチャ」という発表をされました。

クックパッドのログ収集基盤が Fluentd や Amazon Kinesis Streams や Redshift を活用していい感じするアーキテクチャが参考になった方は非常に多いと思います(あの規模のサービスに関わるかはともかく)。

発表の中でKinesis Producer Library(以下 KPL)を使ってログ数多すぎ問題を解決したことが語られていました。

個人的には、Kinesis Streams のシャード数を増やせばどうにかなるレベルのサービスしか担当したことがないため、KPL の検討は見送ってきましたが、発表にあった KPL のログ集約によるスループットの向上が衝撃的だったため、最小構成の KPL を実際に動かしてみました。

なお、今回紹介する構成を

  • Amazon Kinesis Data Streams
  • KPL
  • fluentd
  • Amazon Linux(1 -> 2)
  • Lambda(Python 2.7 ->Python 3.6)

に更新したバージョンは、次のブログを参照下さい。

Kinesis Producer Library(KPL)とfluentdとLambda(Python3)を連携させる(Amazon Linux2版)

KPL について

KPL は 効率的に Kinesis Streams に書きこむライブラリで、以下の様な特徴があります

  • 自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
  • レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
  • ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
  • コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
  • Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする

KPL の詳細は次のドキュメントを参照ください。

KPL の動作確認内容

1. 最小構成の KPL ログ集約型 Kinesis メッセージングシステムを構築

  • ストリームには Amazon Kinesis Streams を利用
  • Producer には aws-fluent-plugin-kinesis を使い、 KPL で aggregate させる
  • Consumer には Lambda(Python 2.7) を使い awslabs/kinesis-aggregation でログを deaggregate させる

2. ログ集約によるレコード数の削減を確認

  • ストリームには Amazon Kinesis Streams を利用
  • Producer には aws-fluent-plugin-kinesis を利用
  • 非 aggregate 方式と KPL による aggregate 方式で、書き込み時のレコード数の違いを確認

検証環境

  • OS : Amazon Linux AMI release 2016.03
  • fluentd : td-agent 2.3.1-0.el2015
  • fluent-plugin-kinesis : 1.0.1

では、順に、動作確認します。

1. 最小構成を構築

最終形

kinesis-producer-library

EC2 に IAM Role の設定

サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:*"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*”
        }
    ]
}

Kinesis ストリームの操作を許可するほか、KPL はカスタムメトリクスも利用するため"cloudwatch:PutMetricData” も許可してください。

許可し忘れていると、fluentd が以下の様なcloudwatch:PutMetricDataの実行エラーログを吐きます。

<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
  <Error>
    <Type>Sender</Type>
    <Code>AccessDenied</Code>
    <Message>User: arn:aws:sts::123456789012:assumed-role/KinesisFluentd/i-7a6a0ae5 is not authorized to perform: cloudwatch:PutMetricData</Message>
  </Error>
  <RequestId>dd56c5ed-299b-11e6-b9b2-299f16fe361d</RequestId>
</ErrorResponse>

ポリシーの詳細は次のドキュメントを参照ください。

http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html

Kinesis Stream の設定

Kinesis Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。

$ STREAM_NAME=test
$ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 2
$ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus`  becomes `ACTIVE`

Apache のインストール

今回は Apache のログを Kinesis Stream に送信します。 Apache をインストールします。

$ sudo yum install -y httpd
$ sudo service httpd start
Starting httpd:                                            [  OK  ]

HTTP アクセスして、Apache が起動していることを確認しましょう。

$ curl localhost/

fluentd の設定

最後に fluentd の設定を行います。

fluentd のインストール

fluentd の安定版である td-agent をインストールします。

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
...
Installed:
  td-agent.x86_64 0:2.3.1-0.el2015

Complete!

fluentd の Kinesis Stream プラグインのインストール

Kinesis Stream プラグインとして fluent-plugin-kinesis をインストールします。

$ sudo td-agent-gem install fluent-plugin-kinesis
Fetching: concurrent-ruby-1.0.2.gem (100%)
Successfully installed concurrent-ruby-1.0.2
Fetching: os-0.9.6.gem (100%)
Successfully installed os-0.9.6
Fetching: middleware-0.1.0.gem (100%)
Successfully installed middleware-0.1.0
Fetching: protobuf-3.6.9.gem (100%)
Successfully installed protobuf-3.6.9
Fetching: fluent-plugin-kinesis-1.0.1.gem (100%)
Successfully installed fluent-plugin-kinesis-1.0.1
Parsing documentation for concurrent-ruby-1.0.2
Installing ri documentation for concurrent-ruby-1.0.2
Parsing documentation for os-0.9.6
Installing ri documentation for os-0.9.6
Parsing documentation for middleware-0.1.0
Installing ri documentation for middleware-0.1.0
Parsing documentation for protobuf-3.6.9
Installing ri documentation for protobuf-3.6.9
Parsing documentation for fluent-plugin-kinesis-1.0.1
Installing ri documentation for fluent-plugin-kinesis-1.0.1
Done installing documentation for concurrent-ruby, os, middleware, protobuf, fluent-plugin-kinesis after 7 seconds
5 gems installed

KPL 向けに protobuf など、いろいろな gem がインストールされています。

fluentd設定ファイルの修正

/etc/td-agent/td-agent.conf にある設定ファイルを修正します。

アクセスログファイル/var/log/httpd/access_logtailし、書き込まれたデータを Kinesis Stream に送信します。

データソースの定義が <source> タグ、データ処理の定義が<match> タグです。

<source>
  @type tail
  format apache2
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd-access.pos
  tag log.httpd.access
</source>

<match log.httpd.*>
  @type kinesis_producer
  stream_name test
  region ap-northeast-1
</match>

fluentd を KPL の Producer として利用するには match ディレクティブの type を kinesis_producer にします。

詳細は次のドキュメントを参照ください。

https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_producer

設定ファイルのシンタックスチェック

設定ファイルのシンタックスチェックをします。

$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
2016-06-04 17:21:45 +0000 [info]: reading config file path="kpl.conf"
2016-06-04 17:21:45 +0000 [info]: starting fluentd-0.12.20 as dry run mode
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-kinesis' version '1.0.1'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-06-04 17:21:45 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-06-04 17:21:45 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-06-04 17:21:45 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_producer"
2016-06-04 17:21:45 +0000 [info]: adding source type="tail"
2016-06-04 17:21:45 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type tail
    format apache2
    path /var/log/httpd/access_log
    pos_file /var/log/td-agent/httpd-access.pos
    tag log.httpd.access
  </source>
  <match log.httpd.*>
    @type kinesis_producer
    stream_name kpl
    region ap-northeast-1
    debug true
    log_level info
    <kinesis_producer>
    </kinesis_producer>
  </match>
</ROOT>
$ echo $?
0

エラーメッセージが表示されず、ステータスコードも正常の「0」です。

fluentd の起動

fluentd をフォアグラウンド実行してみましょう。

$ sudo td-agent -c /etc/td-agent/td-agent.conf
2016-06-04 17:36:30 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-06-04 17:36:30 +0000 [info]: starting fluentd-0.12.20
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-kinesis' version '1.0.1'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-06-04 17:36:30 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-06-04 17:36:30 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-06-04 17:36:30 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_producer"
2016-06-04 17:36:30 +0000 [info]: adding source type="tail"
2016-06-04 17:36:30 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type tail
    format apache2
    path /var/log/httpd/access_log
    pos_file /var/log/td-agent/httpd-access.pos
    tag log.httpd.access
  </source>
  <match log.httpd.*>
    @type kinesis_producer
    stream_name kpl
    region ap-northeast-1
    debug true
    log_level info
    <kinesis_producer>
    </kinesis_producer>
  </match>
</ROOT>
2016-06-04 17:36:30 +0000 [info]: following tail of /var/log/httpd/access_log
[2016-06-04 17:36:31.797746] [0x00007fbc3f4d1740] [info] [metrics_manager.h:148] Uploading metrics to monitoring.ap-northeast-1.amazonaws.com:443

Apache にリクエストをして、Kinesis Stream に送信するデータを生成します。

$ curl localhost

しばらく動かしてみて、問題なさそうなら、 fluentd をバックグラウンド実行しましょう。

$ sudo service td-agent start
Starting td-agent:                       [  OK  ]

pstree の実行結果から td-agent の子プロセスとして KPL プロセス(kinesis_produce)が起動しているのがわかります。

$ pstree -c 7561
sudo───td-agent─┬─td-agent─┬─kinesis_produce─┬─{kinesis_produce}
                │          │                 ├─{kinesis_produce}
                │          │                 ├─{kinesis_produce}
                │          │                 ├─{kinesis_produce}
                │          │                 ├─{kinesis_produce}
                │          │                 └─{kinesis_produce}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          ├─{td-agent}
                │          └─{td-agent}
                └─{td-agent}

Lambda 関数の実装

KPL は複数のログを1レコードに集約(aggregate)してKinesisにPutRecord します。 そのため、Consumer 側は集約されたログを分割(deaggregate)します。

awslabs/kinesis-aggregation で Python Lambda を実装

AWS が GitHub に公開している awslabs/kinesis-aggregation という Lambda 向け KPL の aggregate/deaggregate ライブラリを利用します。

  • Python
  • Java
  • Node.js

と各 Lambda ランタイム向けライブラリが含まれていますが、今回は Python のものを利用します。

$ git clone https://github.com/awslabs/kinesis-aggregation.git
$ cd kinesis-aggregation/python/

移動したディレクトリは、KPL で送信された集約ログを分解し、print するだけの Lambda 関数(lambda_function.py) がブループリントとして用意されているので、今回はこのファイルを利用しましょう。

lambda_function.py の中身を抜粋

aws_kinesis_agg モジュールには集約されたログを分割する Lambda 向け(悪く言うと、Lambda イベントソース決め打ち)の関数が用意されています。あとは、その関数を呼び出して、各ログを処理するだけです。

  • ジェネレーターベース : iter_deaggregate_records
  • 非ジェネレーターベース : deaggregate_records

の2種類があります。

実際の Lambda 関数を一部抜粋します。

from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records
import base64

def lambda_bulk_handler(event, context):
    '''A Python AWS Lambda function to process Kinesis aggregated
    records in a bulk fashion.'''

    raw_kinesis_records = event['Records']

    #Deaggregate all records in one call
    user_records = deaggregate_records(raw_kinesis_records)

    #Iterate through deaggregated records
    for record in user_records:

        # Kinesis data in Python Lambdas is base64 encoded
        payload = base64.b64decode(record['kinesis']['data'])
        print('%s' % (payload))

    return 'Successfully processed {} records.'.format(len(user_records))

Lambda パッケージ(Zip)化

lambda_function.py を元に Lambda 関数として Zip ファイルにまとめるにはどうすればよいでしょうか?

直接利用している aws_kinesis_agg モジュールの他に、シリアライズに利用している Google Protobuf モジュールなども必要です。 Protobuf は Lamba パッケージ化するのが面倒なため、以下のコマンドを叩くと、python_lambda_build.zip というZipファイル化してくれます。

$ python make_lambda_build.py

lambda_function.py をカスタマイズして、依存ライブラリを追加した時は、make_lambda_build.py の上のほうにある定数 PIP_DEPENDENCIES に依存ライブラリを追加してください。

AWSマネージメントコンソールからLambda 関数を登録

作成したZipファイルを管理画面からアップロードして Lambda 関数を以下の内容で作成します。

  • Runtime : Python 2.7
  • Handler : lambda_function.lambda_generator_handler
  • Event sources : “test” Kinesis Streams

CLoudWatch Logs から Lambda の実行ログを確認

Lambda 関数内では、分割した各ログデータを print 出力しています。 CloudWatch > Log Groups > Streams for /aws/lambda/YOUR_LAMBDA_FUNCTION_NAME のログストリームに、deaggregate されたログが出力されていることを確認します。

kinesis-kpl-logs

CloudWatch Custom Metrics の確認

KPL CloudWatch Custom Metrics を出力します。

集約前のログ(User Records)数やRecordPuts 時のエラー状況など、標準の CloudWatch では取得されないメトリクス情報が記録されます。

kinesis-kpl-custom-metrics

詳細は次のドキュメントを参照ください。

http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kpl.html

2. ログ集約によるレコード数の削減を確認

次に、非 aggregate 方式と KPL による aggregate 方式で、書き込み時のレコード数の違いを確認してみましょう。

Kinesis Streams の作成

  • kinesis_streams(非 aggregate 用)
  • kinesis_producer(KPL aggregate 用)

という2本の Kinesis ストリームを作成します。

fluentd の設定

同じログを上記それぞれの Kinesis Streams に集約方式を変えて送信するように以下の設定を行います。 match ディレクトリを @type copy で複製し、片方は @type kinesis_streams 、もう片方は @type kinesis_producer にしています。

<source>
  @type tail
  format apache2
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd-access.pos
  tag log.httpd.access
</source>

<match log.httpd.*>
  @type copy
  <store>
    @type kinesis_streams
    stream_name kinesis_streams
    region ap-northeast-1
    # random_partition_key true
    #use_yajl true
    debug true
    log_level info
  </store>

  <store>
    @type kinesis_producer
    stream_name kinesis_producer
    region ap-northeast-1
    # random_partition_key true
    #use_yajl true
    debug true
    log_level info
  </store>
</match>

Apache にリクエストを繰り返す

以下の様なシェルスクリプトを用意し、Apache にランダムインターバルで HTTP リクエストさせます。

#!/bin/bash

while :
do
  curl --silent localhost -o /dev/null
  sleep `echo "scale=1; $RANDOM % 100 / 10" | bc`
done

Kinesis Streams に投入されたレコード数を比較

HTTP リクエストするシェルスクリプトをしばらく動かし、CloudWatch Metrics の Kinesis -> PutRecords.Records の Average をとったのが以下です。

kinesis-kpl-putrecords-records-compare

なかなか衝撃的な結果です。 オレンジの KPL 集約版(kinesis_producer)は1に対して、ブルーの非集約版(kinesis_streams)は 250 前後をさまよっています。

これはどういうことかというと、ログを KPL で集約することで、Kinesis Streams に送信するレコード数を激減できることを意味します。

Kinesis Streams の書き込み処理には

  • シャードあたり 1000 records/second
  • 500 records/transaction(PutRecords の場合)

というようなハードリミットが存在し、リミットに引っかからないように、シャード数を増やして対応したかたは多いかと思います。

御覧頂いたように、KPL を使うことで、シャード数の削減やスループットの向上を期待できそうです。

まとめ

fluentd/KPL -> Kinesis Streams -> Lambda の連携方法とログ集約による Kinesis Streams のスループット向上を見てきました。

  • Kinesis Producer のコンピューターリソース
  • ログデータの特性
  • Kinesis Consumer の制限

などにより、KPL をすんなりと導入できるとは限らないと思いますが、Kinesis のスループットにお困りの場合は、一度検討してみてはいかがでしょうか。

参考