Kinesis Producer Library(KPL)とfluentdとLambda(Python3)を連携させる(Amazon Linux2版)

2018.02.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS のストリーミングサービス Kinesis には、データ・インジェスションの性能を高め、運用を楽にするKinesis Producer Library(以下 KPL)が存在します。

ユースケース

2016年6月には、以下のような構成で KPL でデータ連携する構築手順を紹介しました。

  • Amazon Kinesis Data Streams
  • KPL
  • fluentd
  • Amazon Linux 1
  • Lambda(Python 2.7)

Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる

今回はそのアップデート版として

  • Amazon Linux → Amazon Linux 2(以下 AL2)
  • Lambda(Python 2.7 → Python 3.6)

に更新した手順を紹介します。

Amazon Linux 2 は 2018/02/11 時点で release candidate (RC) です。

KPL レコードの deaggregate に利用する kinesis-aggregation ライブラリの Python 3 対応は 2018/02/11 時点で WIP です。

変更点

構築上の大きな変更点には以下があります。

  • AL2 への変更により systemd で flutend プロセスを管理
  • KPL データのデシリアライズに利用するライブラリ kinesis-aggregation は Python 3 対応した 1.1 以上を利用

KPL について

KPL は 効率的に Kinesis Data Streams に書きこむライブラリで、以下の様な特徴があります

  • 自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
  • レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
  • ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
  • コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
  • Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする

KPL のデータを処理する Consumer は今回紹介する Lambda 以外にも Kinesis Client Library(KCL) を利用するアプローチもよく採用されます。

KCL は EC2 上で動作するため、 Lambda に比べて自由度が高い一方で、ユーザーが管理する範囲が大幅に増えます。マネージドサービス利点を活かし、Lambda 内のロジックに注力するか、EC2 上で自分たちで好きにやるか。両者は一長一短です。サービスや開発チームの特性に合わせて選択ください。

KPL の詳細は次のドキュメントを参照ください。

KPL の動作確認内容

  • ストリームには Amazon Kinesis Data Streams を利用
  • Producer には aws-fluent-plugin-kinesis を使い、 KPL で aggregate させる
  • Consumer には Lambda を使い kinesis-aggregation ライブラリ でログを deaggregate させる
  • 転送ログは Apache のアクセスログを利用

検証環境

  • OS : Amazon Linux 2 LTS Candidate AMI 2017.12.0 (HVM), SSD Volume Type - ami-1b2bb774
  • fluentd : td-agent 3.1.1-0.el2
  • fluent-plugin-kinesis : 2.1.0
  • kinesis-aggregation : リリース前の py3-compat ブランチ
  • Apache HTTP Server : Apache/2.4.6

では、順に構築します。

最終形

kinesis-producer-library

EC2 に IAM Role の設定

サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:*"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*”
        }
    ]
}

Kinesis ストリームの操作を許可するほか、KPL はカスタムメトリクスも利用するため"cloudwatch:PutMetricData” も許可してください。

許可し忘れていると、fluentd が以下の様なcloudwatch:PutMetricDataの実行エラーログを吐きます。

<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
  <Error>
    <Type>Sender</Type>
    <Code>AccessDenied</Code>
    <Message>User: arn:aws:sts::123456789012:assumed-role/KinesisFluentd/i-7a6a0ae5 is not authorized to perform: cloudwatch:PutMetricData</Message>
  </Error>
  <RequestId>dd56c5ed-299b-11e6-b9b2-299f16fe361d</RequestId>
</ErrorResponse>

ポリシーの詳細は次のドキュメントを参照ください。

http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html

Kinesis Data Stream の作成

Kinesis Data Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。

$ STREAM_NAME=test
$ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 2
$ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus`  becomes `ACTIVE`

Apache のインストール

今回は Apache のログを Kinesis Data Stream に送信します。 Apache をインストールします。

$ sudo yum install -y httpd

systemctl でプロセス起動させます。

$ sudo systemctl start httpd.service
$ sudo systemctl status httpd.service
● httpd.service - The Apache HTTP Server
   Loaded: loaded (/usr/lib/systemd/system/httpd.service; disabled; vendor preset: disabled)
   Active: active (running) since Wed 2018-02-07 21:37:07 UTC; 5s ago
     Docs: man:httpd(8)
           man:apachectl(8)
 Main PID: 3301 (httpd)
   Status: "Processing requests..."
   CGroup: /system.slice/httpd.service
           ├─3301 /usr/sbin/httpd -DFOREGROUND
           ├─3302 /usr/sbin/httpd -DFOREGROUND
           ├─3303 /usr/sbin/httpd -DFOREGROUND
           ├─3304 /usr/sbin/httpd -DFOREGROUND
           ├─3305 /usr/sbin/httpd -DFOREGROUND
           └─3306 /usr/sbin/httpd -DFOREGROUND

Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting The Apache HTTP Server...
Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started The Apache HTTP Server.

HTTP アクセスして、Apache が起動していることを確認しましょう。

$ sudo touch /var/www/html/index.html  # index.html ファイルを作成
$ curl -I localhost
HTTP/1.1 200 OK
Date: Wed, 07 Feb 2018 21:41:17 GMT
Server: Apache/2.4.6 (Amazon Linux 2)
Last-Modified: Wed, 07 Feb 2018 21:41:14 GMT
ETag: "0-564a627886c85"
Accept-Ranges: bytes
Content-Type: text/html; charset=UTF-8

fluentd の設定

最後に fluentd の設定を行います。

fluentd のインストール

fluentd の安定版である td-agent をインストールします。

インストールスクリプトは AL1 向けと AL2 向けで異なります。 今回は、当然ながら AL2 向けのインストールスクリプトを利用します。

$ curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent3.sh | sh
...
Installed:
  td-agent.x86_64 0:3.1.1-0.el2

Dependency Installed:
  avahi-libs.x86_64 0:0.6.31-17.amzn2                    cups-client.x86_64 1:1.6.3-29.amzn2     cups-libs.x86_64 1:1.6.3-29.amzn2     m4.x86_64 0:1.4.16-10.amzn2
  mailx.x86_64 0:12.5-16.amzn2                           patch.x86_64 0:2.7.1-8.amzn2            spax.x86_64 0:1.5.2-13.amzn2          system-lsb-core.x86_64 0:4.1-27.amzn2.3
  system-lsb-submod-security.x86_64 0:4.1-27.amzn2.3

Complete!

Installation completed. Happy Logging!

NOTE: In case you need any of these:
  1) security tested binary with a clear life cycle management
  2) advanced monitoring and management
  3) support SLA
Please check Fluentd Enterprise (https://www.treasuredata.com/fluentd/).

fluentd の Amazon Kinesis プラグインのインストール

Amazon Kinesis プラグインとして fluent-plugin-kinesis をインストールします。

$ sudo td-agent-gem install fluent-plugin-kinesis
...
131 gems installed

KPL 向けに protobuf など、いろいろな gem がインストールされています。

fluentd 設定ファイルの修正

/etc/td-agent/td-agent.conf にある設定ファイルを修正します。

アクセスログファイル/var/log/httpd/access_logtailし、書き込まれたデータを Kinesis Stream に送信します。

データソースの定義が <source> タグ、データ処理の定義が<match> タグです。

/etc/td-agent/td-agent.conf

<source>
  @type tail
  format apache2
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd-access.pos
  tag log.httpd.access
</source>

<match log.httpd.*>
  @type kinesis_streams_aggregated
  stream_name kpl-stream
  region eu-west-1
</match>

fluentd を KPL の Producer として利用するには match ディレクティブの type を kinesis_producer にします。

詳細は次のドキュメントを参照ください。

https://github.com/awslabs/aws-fluent-plugin-kinesis

fluentd の設定ファイルのシンタックスチェック

設定ファイルのシンタックスチェックをします。

$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
2018-02-07 20:53:37 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf"
2018-02-07 20:53:37 +0000 [info]: starting fluentd-1.0.2 as dry run mode ruby="2.4.2"
$ echo $?
0

エラーメッセージは表示されません。

flutend の systemd ファイルの変更

flurentd プロセスはデフォルトでは td-agent ユーザー・グループ権限で起動されます。

Apache のログディレクトリの権限変更・グループ操作などを行わない場合、Apache のログ読み込み時に以下のエラーが発生します。

2018-02-07 21:08:02 +0000 [error]: #0 unexpected error error_class=Errno::EACCES error="Permission denied @ rb_sysopen - /var/log/td-agent/httpd-access.pos"

今回は fluentd プロセスをルート権限で実行させます。

/usr/lib/systemd/system/td-agent.service

[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target

[Service]
# User=td-agent
# Group=td-agent
User=root
Group=root
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120

[Install]
WantedBy=multi-user.target

fluentd の起動

fluentd をフォアグラウンド実行してみましょう。

$ sudo td-agent -c /etc/td-agent/td-agent.conf
2018-02-10 12:21:21 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf"
2018-02-10 12:21:21 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type tail
    format apache2
    path "/var/log/httpd/access_log"
    pos_file "/var/log/td-agent/httpd-access.pos"
    tag "log.httpd.access"
    <parse>
      @type apache2
    </parse>
  </source>
  <match log.httpd.*>
    @type kinesis_streams_aggregated
    stream_name "test"
    region "ap-northeast-1"
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
    </buffer>
  </match>
</ROOT>
2018-02-10 12:21:21 +0000 [info]: starting fluentd-1.0.2 pid=3252 ruby="2.4.2"
2018-02-10 12:21:21 +0000 [info]: spawn command to main:  cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/sbin/td-agent", "-c", "/etc/td-agent/td-agent.conf", "--under-supervisor"]
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '2.4.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kafka' version '0.6.5'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kinesis' version '2.1.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.0.1'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-s3' version '1.1.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td' version '1.0.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.3'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-webhdfs' version '1.2.2'
2018-02-10 12:21:21 +0000 [info]: gem 'fluentd' version '1.0.2'
2018-02-10 12:21:21 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_streams_aggregated"
2018-02-10 12:21:21 +0000 [info]: adding source type="tail"
2018-02-10 12:21:21 +0000 [info]: #0 starting fluentd worker pid=3256 ppid=3252 worker=0
2018-02-10 12:21:21 +0000 [info]: #0 following tail of /var/log/httpd/access_log
2018-02-10 12:21:21 +0000 [info]: #0 fluentd worker is now running worker=0

Apache にリクエストをして、Kinesis Data Stream に送信するデータを生成します。

$ curl localhost

問題が見つからなければ、 fluentd を systemctl でバックグラウンド実行しましょう。

$ sudo systemctl start td-agent.service

$ sudo systemctl status td-agent.service
● td-agent.service - td-agent: Fluentd based data collector for Treasure Data
   Loaded: loaded (/usr/lib/systemd/system/td-agent.service; disabled; vendor preset: disabled)
   Active: active (running) since Wed 2018-02-07 21:28:10 UTC; 7s ago
     Docs: https://docs.treasuredata.com/articles/td-agent
  Process: 3451 ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid (code=exited, status=0/SUCCESS)
 Main PID: 3456 (fluentd)
   CGroup: /system.slice/td-agent.service
           ├─3456 /opt/td-agent/embedded/bin/ruby /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid
           └─3461 /opt/td-agent/embedded/bin/ruby -Eascii-8bit:ascii-8bit /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-ag...

Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting td-agent: Fluentd based data collector for Treasure Data...
Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started td-agent: Fluentd based data collector for Treasure Data.

Lambda 関数の実装

KPL は複数のログを1レコードに集約(aggregate)してKinesisに PutRecord します。 そのため、Consumer 側は集約されたログを分割(deaggregate)します。

awslabs/kinesis-aggregation で Python Lambda を実装

AWS が GitHub に公開している awslabs/kinesis-aggregation という Lambda 向け KPL の aggregate/deaggregate ライブラリを利用します。

  • Python
  • Java
  • Node.js

と各 Lambda ランタイム向けライブラリが含まれています。

  • C#
  • Golang

向けはまだ未対応です。

今回は Python のものを利用します。

$ git clone https://github.com/awslabs/kinesis-aggregation.git
$ cd kinesis-aggregation/python/
$ git checkout py3-compat

今回は Python3.6 向けの Lambda 関数を作成します。 WIP な py3-compat ブランチをチェックアウトします。

移動したディレクトリは、KPL で送信された集約ログを分解し、print するだけの Lambda 関数(lambda_function.py) がブループリントとして用意されているので、今回はこのファイルを利用します。

lambda_function.py の中身を抜粋

aws_kinesis_agg モジュールには集約されたログを分割する Lambda 向け(悪く言うと、Lambda イベントソース決め打ち)の関数が用意されています。あとは、その関数を呼び出して、各ログを処理するだけです。

  • ジェネレーターベース : iter_deaggregate_records
  • 非ジェネレーターベース : deaggregate_records

の2種類があります。

実際の Lambda 関数を一部抜粋します。

from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records
import base64
import six


def lambda_bulk_handler(event, context):
    """A Python AWS Lambda function to process Kinesis aggregated
    records in a bulk fashion."""

    raw_kinesis_records = event['Records']

    # Deaggregate all records in one call
    user_records = deaggregate_records(raw_kinesis_records)

    # Iterate through deaggregated records
    for record in user_records:        

        # Kinesis data in Python Lambdas is base64 encoded
        payload = base64.b64decode(record['kinesis']['data'])
        six.print_('%s' % payload)

    return 'Successfully processed {} records.'.format(len(user_records))

Python2/3 対応のために six ライブラリを利用していることを確認できます。なお、six の由来は 2 x 3 = 6 からです。

Lambda パッケージ(Zip)化

lambda_function.py を Lambda 関数として Zip ファイルにまとめるにはどうすればよいでしょうか?

直接利用している aws_kinesis_agg モジュールの他に、シリアライズに利用している Google Protobuf モジュールなども必要です。 Protobuf は Lambda パッケージ化の注意点が多いため、シュッと ZIP アーカイブを作成してくれる専用プログラム(make_lambda_build.py)が存在します。

$ python make_lambda_build.py
...
Successfully created Lambda build zip file: /Users/jsmith/dev/kinesis-aggregation/python/python_lambda_build.zip

lambda_function.py をカスタマイズして、依存ライブラリを追加した時は、make_lambda_build.py の上のほうにある定数 PIP_DEPENDENCIES に依存ライブラリを追加してください。

make_lambda_build.py

#Add your PIP dependencies here
PIP_DEPENDENCIES = ['protobuf']

AWSマネージメントコンソールからLambda 関数を登録

作成したZipファイルを管理画面からアップロードして Lambda 関数を以下の内容で作成します。

  • Runtime : Python 3.6

  • Handler : lambda_function.lambda_generator_handler

  • Event sources : “test” Kinesis Streams

CLoudWatch Logs から Lambda の実行ログを確認

Lambda 関数内では、分割した各ログデータを print 出力しています。 CloudWatch > Log Groups > Streams for /aws/lambda/YOUR_LAMBDA_FUNCTION_NAME のログストリームに、deaggregate されたログが出力されていることを確認します。

無事 Apache のアクセスログが出力されていれば、EC2(AL2) -> fluentd -> KPL -> Kinesis Data Streams -> Lambda とデータ連携されたことを意味します。

まとめ

今回は fluentd/KPL on Amazon Linux 2 -> Amazon Kinesis Data Streams -> Lambda(Python 3.6) のデータ連携・ログ集約を実装しました。

AL2 が systemd を採用している点を除き、 AL1 時代と同様の手順で構築でき、AL1 から AL2 への移行もスムーズと思われます。

KPL でデータ集約した場合とデータ集約しなかった場合のスループットの違いは、過去のブログ記事を参照下さい

Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる

あとは KPL が Amazon Kinesis Data Firehose に対応してくれると、最高なんですが、、、

Will KPL support writing to kinesis firehose? · Issue #29 · awslabs/amazon-kinesis-producer · GitHub

参考