Kinesis Producer Library(KPL)とfluentdとLambda(Python3)を連携させる(Amazon Linux2版)
AWS のストリーミングサービス Kinesis には、データ・インジェスションの性能を高め、運用を楽にするKinesis Producer Library(以下 KPL)が存在します。
ユースケース
2016年6月には、以下のような構成で KPL でデータ連携する構築手順を紹介しました。
- Amazon Kinesis Data Streams
- KPL
- fluentd
- Amazon Linux 1
- Lambda(Python 2.7)
Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる
今回はそのアップデート版として
- Amazon Linux → Amazon Linux 2(以下 AL2)
- Lambda(Python 2.7 → Python 3.6)
に更新した手順を紹介します。
Amazon Linux 2 は 2018/02/11 時点で release candidate (RC) です。
KPL レコードの deaggregate に利用する kinesis-aggregation ライブラリの Python 3 対応は 2018/02/11 時点で WIP です。
変更点
構築上の大きな変更点には以下があります。
- AL2 への変更により systemd で flutend プロセスを管理
- KPL データのデシリアライズに利用するライブラリ kinesis-aggregation は Python 3 対応した 1.1 以上を利用
KPL について
KPL は 効率的に Kinesis Data Streams に書きこむライブラリで、以下の様な特徴があります
- 自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
- レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
- ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
- コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
- Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする
KPL のデータを処理する Consumer は今回紹介する Lambda 以外にも Kinesis Client Library(KCL) を利用するアプローチもよく採用されます。
KCL は EC2 上で動作するため、 Lambda に比べて自由度が高い一方で、ユーザーが管理する範囲が大幅に増えます。マネージドサービス利点を活かし、Lambda 内のロジックに注力するか、EC2 上で自分たちで好きにやるか。両者は一長一短です。サービスや開発チームの特性に合わせて選択ください。
KPL の詳細は次のドキュメントを参照ください。
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://blogs.aws.amazon.com/bigdata/post/Tx3ET30EGDKUUI2/Implementing-Efficient-and-Reliable-Producers-with-the-Amazon-Kinesis-Producer-L
KPL の動作確認内容
- ストリームには Amazon Kinesis Data Streams を利用
- Producer には aws-fluent-plugin-kinesis を使い、 KPL で aggregate させる
- Consumer には Lambda を使い kinesis-aggregation ライブラリ でログを deaggregate させる
- 転送ログは Apache のアクセスログを利用
検証環境
- OS : Amazon Linux 2 LTS Candidate AMI 2017.12.0 (HVM), SSD Volume Type - ami-1b2bb774
- fluentd : td-agent 3.1.1-0.el2
- fluent-plugin-kinesis : 2.1.0
- kinesis-aggregation : リリース前の py3-compat ブランチ
- Apache HTTP Server : Apache/2.4.6
では、順に構築します。
最終形
EC2 に IAM Role の設定
サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "cloudwatch:PutMetricData" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "kinesis:*" ], "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*” } ] }
Kinesis ストリームの操作を許可するほか、KPL はカスタムメトリクスも利用するため"cloudwatch:PutMetricData”
も許可してください。
許可し忘れていると、fluentd が以下の様なcloudwatch:PutMetricData
の実行エラーログを吐きます。
<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/"> <Error> <Type>Sender</Type> <Code>AccessDenied</Code> <Message>User: arn:aws:sts::123456789012:assumed-role/KinesisFluentd/i-7a6a0ae5 is not authorized to perform: cloudwatch:PutMetricData</Message> </Error> <RequestId>dd56c5ed-299b-11e6-b9b2-299f16fe361d</RequestId> </ErrorResponse>
ポリシーの詳細は次のドキュメントを参照ください。
http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html
Kinesis Data Stream の作成
Kinesis Data Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。
$ STREAM_NAME=test $ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 2 $ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus` becomes `ACTIVE`
Apache のインストール
今回は Apache のログを Kinesis Data Stream に送信します。 Apache をインストールします。
$ sudo yum install -y httpd
systemctl でプロセス起動させます。
$ sudo systemctl start httpd.service $ sudo systemctl status httpd.service ● httpd.service - The Apache HTTP Server Loaded: loaded (/usr/lib/systemd/system/httpd.service; disabled; vendor preset: disabled) Active: active (running) since Wed 2018-02-07 21:37:07 UTC; 5s ago Docs: man:httpd(8) man:apachectl(8) Main PID: 3301 (httpd) Status: "Processing requests..." CGroup: /system.slice/httpd.service ├─3301 /usr/sbin/httpd -DFOREGROUND ├─3302 /usr/sbin/httpd -DFOREGROUND ├─3303 /usr/sbin/httpd -DFOREGROUND ├─3304 /usr/sbin/httpd -DFOREGROUND ├─3305 /usr/sbin/httpd -DFOREGROUND └─3306 /usr/sbin/httpd -DFOREGROUND Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting The Apache HTTP Server... Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started The Apache HTTP Server.
HTTP アクセスして、Apache が起動していることを確認しましょう。
$ sudo touch /var/www/html/index.html # index.html ファイルを作成 $ curl -I localhost HTTP/1.1 200 OK Date: Wed, 07 Feb 2018 21:41:17 GMT Server: Apache/2.4.6 (Amazon Linux 2) Last-Modified: Wed, 07 Feb 2018 21:41:14 GMT ETag: "0-564a627886c85" Accept-Ranges: bytes Content-Type: text/html; charset=UTF-8
fluentd の設定
最後に fluentd の設定を行います。
fluentd のインストール
fluentd の安定版である td-agent をインストールします。
インストールスクリプトは AL1 向けと AL2 向けで異なります。 今回は、当然ながら AL2 向けのインストールスクリプトを利用します。
$ curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent3.sh | sh ... Installed: td-agent.x86_64 0:3.1.1-0.el2 Dependency Installed: avahi-libs.x86_64 0:0.6.31-17.amzn2 cups-client.x86_64 1:1.6.3-29.amzn2 cups-libs.x86_64 1:1.6.3-29.amzn2 m4.x86_64 0:1.4.16-10.amzn2 mailx.x86_64 0:12.5-16.amzn2 patch.x86_64 0:2.7.1-8.amzn2 spax.x86_64 0:1.5.2-13.amzn2 system-lsb-core.x86_64 0:4.1-27.amzn2.3 system-lsb-submod-security.x86_64 0:4.1-27.amzn2.3 Complete! Installation completed. Happy Logging! NOTE: In case you need any of these: 1) security tested binary with a clear life cycle management 2) advanced monitoring and management 3) support SLA Please check Fluentd Enterprise (https://www.treasuredata.com/fluentd/).
fluentd の Amazon Kinesis プラグインのインストール
Amazon Kinesis プラグインとして fluent-plugin-kinesis をインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis ... 131 gems installed
KPL 向けに protobuf など、いろいろな gem がインストールされています。
fluentd 設定ファイルの修正
/etc/td-agent/td-agent.conf
にある設定ファイルを修正します。
アクセスログファイル/var/log/httpd/access_log
を tail
し、書き込まれたデータを Kinesis Stream に送信します。
データソースの定義が <source>
タグ、データ処理の定義が<match>
タグです。
<source> @type tail format apache2 path /var/log/httpd/access_log pos_file /var/log/td-agent/httpd-access.pos tag log.httpd.access </source> <match log.httpd.*> @type kinesis_streams_aggregated stream_name kpl-stream region eu-west-1 </match>
fluentd を KPL の Producer として利用するには match ディレクティブの type を kinesis_producer
にします。
詳細は次のドキュメントを参照ください。
https://github.com/awslabs/aws-fluent-plugin-kinesis
fluentd の設定ファイルのシンタックスチェック
設定ファイルのシンタックスチェックをします。
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf 2018-02-07 20:53:37 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf" 2018-02-07 20:53:37 +0000 [info]: starting fluentd-1.0.2 as dry run mode ruby="2.4.2" $ echo $? 0
エラーメッセージは表示されません。
flutend の systemd ファイルの変更
flurentd プロセスはデフォルトでは td-agent ユーザー・グループ権限で起動されます。
Apache のログディレクトリの権限変更・グループ操作などを行わない場合、Apache のログ読み込み時に以下のエラーが発生します。
2018-02-07 21:08:02 +0000 [error]: #0 unexpected error error_class=Errno::EACCES error="Permission denied @ rb_sysopen - /var/log/td-agent/httpd-access.pos"
今回は fluentd プロセスをルート権限で実行させます。
[Unit] Description=td-agent: Fluentd based data collector for Treasure Data Documentation=https://docs.treasuredata.com/articles/td-agent After=network-online.target Wants=network-online.target [Service] # User=td-agent # Group=td-agent User=root Group=root LimitNOFILE=65536 Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/ Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/ Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf Environment=FLUENT_PLUGIN=/etc/td-agent/plugin Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock PIDFile=/var/run/td-agent/td-agent.pid RuntimeDirectory=td-agent Type=forking ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid ExecStop=/bin/kill -TERM ${MAINPID} ExecReload=/bin/kill -HUP ${MAINPID} Restart=always TimeoutStopSec=120 [Install] WantedBy=multi-user.target
fluentd の起動
fluentd をフォアグラウンド実行してみましょう。
$ sudo td-agent -c /etc/td-agent/td-agent.conf 2018-02-10 12:21:21 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf" 2018-02-10 12:21:21 +0000 [info]: using configuration file: <ROOT> <source> @type tail format apache2 path "/var/log/httpd/access_log" pos_file "/var/log/td-agent/httpd-access.pos" tag "log.httpd.access" <parse> @type apache2 </parse> </source> <match log.httpd.*> @type kinesis_streams_aggregated stream_name "test" region "ap-northeast-1" <buffer> flush_mode interval retry_type exponential_backoff </buffer> </match> </ROOT> 2018-02-10 12:21:21 +0000 [info]: starting fluentd-1.0.2 pid=3252 ruby="2.4.2" 2018-02-10 12:21:21 +0000 [info]: spawn command to main: cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/sbin/td-agent", "-c", "/etc/td-agent/td-agent.conf", "--under-supervisor"] 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '2.4.0' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kafka' version '0.6.5' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kinesis' version '2.1.0' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.0.1' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-s3' version '1.1.0' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td' version '1.0.0' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.3' 2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-webhdfs' version '1.2.2' 2018-02-10 12:21:21 +0000 [info]: gem 'fluentd' version '1.0.2' 2018-02-10 12:21:21 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_streams_aggregated" 2018-02-10 12:21:21 +0000 [info]: adding source type="tail" 2018-02-10 12:21:21 +0000 [info]: #0 starting fluentd worker pid=3256 ppid=3252 worker=0 2018-02-10 12:21:21 +0000 [info]: #0 following tail of /var/log/httpd/access_log 2018-02-10 12:21:21 +0000 [info]: #0 fluentd worker is now running worker=0
Apache にリクエストをして、Kinesis Data Stream に送信するデータを生成します。
$ curl localhost
問題が見つからなければ、 fluentd を systemctl
でバックグラウンド実行しましょう。
$ sudo systemctl start td-agent.service $ sudo systemctl status td-agent.service ● td-agent.service - td-agent: Fluentd based data collector for Treasure Data Loaded: loaded (/usr/lib/systemd/system/td-agent.service; disabled; vendor preset: disabled) Active: active (running) since Wed 2018-02-07 21:28:10 UTC; 7s ago Docs: https://docs.treasuredata.com/articles/td-agent Process: 3451 ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid (code=exited, status=0/SUCCESS) Main PID: 3456 (fluentd) CGroup: /system.slice/td-agent.service ├─3456 /opt/td-agent/embedded/bin/ruby /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid └─3461 /opt/td-agent/embedded/bin/ruby -Eascii-8bit:ascii-8bit /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-ag... Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting td-agent: Fluentd based data collector for Treasure Data... Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started td-agent: Fluentd based data collector for Treasure Data.
Lambda 関数の実装
KPL は複数のログを1レコードに集約(aggregate)してKinesisに PutRecord
します。
そのため、Consumer 側は集約されたログを分割(deaggregate)します。
awslabs/kinesis-aggregation で Python Lambda を実装
AWS が GitHub に公開している awslabs/kinesis-aggregation という Lambda 向け KPL の aggregate/deaggregate ライブラリを利用します。
- Python
- Java
- Node.js
と各 Lambda ランタイム向けライブラリが含まれています。
- C#
- Golang
向けはまだ未対応です。
今回は Python のものを利用します。
$ git clone https://github.com/awslabs/kinesis-aggregation.git $ cd kinesis-aggregation/python/ $ git checkout py3-compat
今回は Python3.6 向けの Lambda 関数を作成します。
WIP な py3-compat
ブランチをチェックアウトします。
移動したディレクトリは、KPL で送信された集約ログを分解し、print するだけの Lambda 関数(lambda_function.py
) がブループリントとして用意されているので、今回はこのファイルを利用します。
lambda_function.py の中身を抜粋
aws_kinesis_agg モジュールには集約されたログを分割する Lambda 向け(悪く言うと、Lambda イベントソース決め打ち)の関数が用意されています。あとは、その関数を呼び出して、各ログを処理するだけです。
- ジェネレーターベース : iter_deaggregate_records
- 非ジェネレーターベース : deaggregate_records
の2種類があります。
実際の Lambda 関数を一部抜粋します。
from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records import base64 import six def lambda_bulk_handler(event, context): """A Python AWS Lambda function to process Kinesis aggregated records in a bulk fashion.""" raw_kinesis_records = event['Records'] # Deaggregate all records in one call user_records = deaggregate_records(raw_kinesis_records) # Iterate through deaggregated records for record in user_records: # Kinesis data in Python Lambdas is base64 encoded payload = base64.b64decode(record['kinesis']['data']) six.print_('%s' % payload) return 'Successfully processed {} records.'.format(len(user_records))
Python2/3 対応のために six ライブラリを利用していることを確認できます。なお、six の由来は 2 x 3 = 6 からです。
Lambda パッケージ(Zip)化
lambda_function.py
を Lambda 関数として Zip ファイルにまとめるにはどうすればよいでしょうか?
直接利用している aws_kinesis_agg モジュールの他に、シリアライズに利用している Google Protobuf モジュールなども必要です。
Protobuf は Lambda パッケージ化の注意点が多いため、シュッと ZIP アーカイブを作成してくれる専用プログラム(make_lambda_build.py
)が存在します。
$ python make_lambda_build.py ... Successfully created Lambda build zip file: /Users/jsmith/dev/kinesis-aggregation/python/python_lambda_build.zip
lambda_function.py
をカスタマイズして、依存ライブラリを追加した時は、make_lambda_build.py
の上のほうにある定数 PIP_DEPENDENCIES
に依存ライブラリを追加してください。
#Add your PIP dependencies here PIP_DEPENDENCIES = ['protobuf']
AWSマネージメントコンソールからLambda 関数を登録
作成したZipファイルを管理画面からアップロードして Lambda 関数を以下の内容で作成します。
- Runtime : Python 3.6
-
Handler : lambda_function.lambda_generator_handler
- Event sources : “test” Kinesis Streams
CLoudWatch Logs から Lambda の実行ログを確認
Lambda 関数内では、分割した各ログデータを print 出力しています。 CloudWatch > Log Groups > Streams for /aws/lambda/YOUR_LAMBDA_FUNCTION_NAME のログストリームに、deaggregate されたログが出力されていることを確認します。
無事 Apache のアクセスログが出力されていれば、EC2(AL2) -> fluentd -> KPL -> Kinesis Data Streams -> Lambda とデータ連携されたことを意味します。
まとめ
今回は fluentd/KPL on Amazon Linux 2 -> Amazon Kinesis Data Streams -> Lambda(Python 3.6) のデータ連携・ログ集約を実装しました。
AL2 が systemd を採用している点を除き、 AL1 時代と同様の手順で構築でき、AL1 から AL2 への移行もスムーズと思われます。
KPL でデータ集約した場合とデータ集約しなかった場合のスループットの違いは、過去のブログ記事を参照下さい
Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる
あとは KPL が Amazon Kinesis Data Firehose に対応してくれると、最高なんですが、、、
Will KPL support writing to kinesis firehose? · Issue #29 · awslabs/amazon-kinesis-producer · GitHub
参考
- python lib is not maintained (no python3 support) · Issue #22 · awslabs/kinesis-aggregation · GitHub
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://github.com/awslabs/amazon-kinesis-producer
- https://github.com/awslabs/aws-fluent-plugin-kinesis
- https://github.com/awslabs/kinesis-aggregation