この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS のストリーミングサービス Kinesis には、データ・インジェスションの性能を高め、運用を楽にするKinesis Producer Library(以下 KPL)が存在します。
ユースケース
2016年6月には、以下のような構成で KPL でデータ連携する構築手順を紹介しました。
- Amazon Kinesis Data Streams
- KPL
- fluentd
- Amazon Linux 1
- Lambda(Python 2.7)
Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる
今回はそのアップデート版として
- Amazon Linux → Amazon Linux 2(以下 AL2)
- Lambda(Python 2.7 → Python 3.6)
に更新した手順を紹介します。
Amazon Linux 2 は 2018/02/11 時点で release candidate (RC) です。
KPL レコードの deaggregate に利用する kinesis-aggregation ライブラリの Python 3 対応は 2018/02/11 時点で WIP です。
変更点
構築上の大きな変更点には以下があります。
- AL2 への変更により systemd で flutend プロセスを管理
- KPL データのデシリアライズに利用するライブラリ kinesis-aggregation は Python 3 対応した 1.1 以上を利用
KPL について
KPL は 効率的に Kinesis Data Streams に書きこむライブラリで、以下の様な特徴があります
- 自動的で設定可能な再試行メカニズムにより 1 つ以上の Amazon Kinesis stream へ書き込む
- レコードを収集し、PutRecords を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
- ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
- コンシューマーで Amazon Kinesis Client Library(KCL)とシームレスに統合して、バッチ処理されたレコードを集約解除する
- Amazon CloudWatch メトリックスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする
KPL のデータを処理する Consumer は今回紹介する Lambda 以外にも Kinesis Client Library(KCL) を利用するアプローチもよく採用されます。
KCL は EC2 上で動作するため、 Lambda に比べて自由度が高い一方で、ユーザーが管理する範囲が大幅に増えます。マネージドサービス利点を活かし、Lambda 内のロジックに注力するか、EC2 上で自分たちで好きにやるか。両者は一長一短です。サービスや開発チームの特性に合わせて選択ください。
KPL の詳細は次のドキュメントを参照ください。
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://blogs.aws.amazon.com/bigdata/post/Tx3ET30EGDKUUI2/Implementing-Efficient-and-Reliable-Producers-with-the-Amazon-Kinesis-Producer-L
KPL の動作確認内容
- ストリームには Amazon Kinesis Data Streams を利用
- Producer には aws-fluent-plugin-kinesis を使い、 KPL で aggregate させる
- Consumer には Lambda を使い kinesis-aggregation ライブラリ でログを deaggregate させる
- 転送ログは Apache のアクセスログを利用
検証環境
- OS : Amazon Linux 2 LTS Candidate AMI 2017.12.0 (HVM), SSD Volume Type - ami-1b2bb774
- fluentd : td-agent 3.1.1-0.el2
- fluent-plugin-kinesis : 2.1.0
- kinesis-aggregation : リリース前の py3-compat ブランチ
- Apache HTTP Server : Apache/2.4.6
では、順に構築します。
最終形
EC2 に IAM Role の設定
サーバーにAWS認証情報を配布して、リクエスト時に認証情報を利用する代わりに、EC2インスタンスに IAM Role を設定してリソース操作を許可します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"cloudwatch:PutMetricData"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/*”
}
]
}
Kinesis ストリームの操作を許可するほか、KPL はカスタムメトリクスも利用するため"cloudwatch:PutMetricData”
も許可してください。
許可し忘れていると、fluentd が以下の様なcloudwatch:PutMetricData
の実行エラーログを吐きます。
<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
<Error>
<Type>Sender</Type>
<Code>AccessDenied</Code>
<Message>User: arn:aws:sts::123456789012:assumed-role/KinesisFluentd/i-7a6a0ae5 is not authorized to perform: cloudwatch:PutMetricData</Message>
</Error>
<RequestId>dd56c5ed-299b-11e6-b9b2-299f16fe361d</RequestId>
</ErrorResponse>
ポリシーの詳細は次のドキュメントを参照ください。
http://docs.aws.amazon.com/kinesis/latest/dev/controlling-access.html
Kinesis Data Stream の作成
Kinesis Data Stream をコマンドラインから設定します。 今回はストリーム名は「test」とします。
$ STREAM_NAME=test
$ aws kinesis create-stream --stream-name $STREAM_NAME --shard-count 2
$ aws kinesis wait stream-exists --stream-name $STREAM_NAME # wait until `StreamStatus` becomes `ACTIVE`
Apache のインストール
今回は Apache のログを Kinesis Data Stream に送信します。 Apache をインストールします。
$ sudo yum install -y httpd
systemctl でプロセス起動させます。
$ sudo systemctl start httpd.service
$ sudo systemctl status httpd.service
● httpd.service - The Apache HTTP Server
Loaded: loaded (/usr/lib/systemd/system/httpd.service; disabled; vendor preset: disabled)
Active: active (running) since Wed 2018-02-07 21:37:07 UTC; 5s ago
Docs: man:httpd(8)
man:apachectl(8)
Main PID: 3301 (httpd)
Status: "Processing requests..."
CGroup: /system.slice/httpd.service
├─3301 /usr/sbin/httpd -DFOREGROUND
├─3302 /usr/sbin/httpd -DFOREGROUND
├─3303 /usr/sbin/httpd -DFOREGROUND
├─3304 /usr/sbin/httpd -DFOREGROUND
├─3305 /usr/sbin/httpd -DFOREGROUND
└─3306 /usr/sbin/httpd -DFOREGROUND
Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting The Apache HTTP Server...
Feb 07 21:37:07 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started The Apache HTTP Server.
HTTP アクセスして、Apache が起動していることを確認しましょう。
$ sudo touch /var/www/html/index.html # index.html ファイルを作成
$ curl -I localhost
HTTP/1.1 200 OK
Date: Wed, 07 Feb 2018 21:41:17 GMT
Server: Apache/2.4.6 (Amazon Linux 2)
Last-Modified: Wed, 07 Feb 2018 21:41:14 GMT
ETag: "0-564a627886c85"
Accept-Ranges: bytes
Content-Type: text/html; charset=UTF-8
fluentd の設定
最後に fluentd の設定を行います。
fluentd のインストール
fluentd の安定版である td-agent をインストールします。
インストールスクリプトは AL1 向けと AL2 向けで異なります。 今回は、当然ながら AL2 向けのインストールスクリプトを利用します。
$ curl -L https://toolbelt.treasuredata.com/sh/install-amazon2-td-agent3.sh | sh
...
Installed:
td-agent.x86_64 0:3.1.1-0.el2
Dependency Installed:
avahi-libs.x86_64 0:0.6.31-17.amzn2 cups-client.x86_64 1:1.6.3-29.amzn2 cups-libs.x86_64 1:1.6.3-29.amzn2 m4.x86_64 0:1.4.16-10.amzn2
mailx.x86_64 0:12.5-16.amzn2 patch.x86_64 0:2.7.1-8.amzn2 spax.x86_64 0:1.5.2-13.amzn2 system-lsb-core.x86_64 0:4.1-27.amzn2.3
system-lsb-submod-security.x86_64 0:4.1-27.amzn2.3
Complete!
Installation completed. Happy Logging!
NOTE: In case you need any of these:
1) security tested binary with a clear life cycle management
2) advanced monitoring and management
3) support SLA
Please check Fluentd Enterprise (https://www.treasuredata.com/fluentd/).
fluentd の Amazon Kinesis プラグインのインストール
Amazon Kinesis プラグインとして fluent-plugin-kinesis をインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis
...
131 gems installed
KPL 向けに protobuf など、いろいろな gem がインストールされています。
fluentd 設定ファイルの修正
/etc/td-agent/td-agent.conf
にある設定ファイルを修正します。
アクセスログファイル/var/log/httpd/access_log
を tail
し、書き込まれたデータを Kinesis Stream に送信します。
データソースの定義が <source>
タグ、データ処理の定義が<match>
タグです。
/etc/td-agent/td-agent.conf
<source>
@type tail
format apache2
path /var/log/httpd/access_log
pos_file /var/log/td-agent/httpd-access.pos
tag log.httpd.access
</source>
<match log.httpd.*>
@type kinesis_streams_aggregated
stream_name kpl-stream
region eu-west-1
</match>
fluentd を KPL の Producer として利用するには match ディレクティブの type を kinesis_producer
にします。
詳細は次のドキュメントを参照ください。
https://github.com/awslabs/aws-fluent-plugin-kinesis
fluentd の設定ファイルのシンタックスチェック
設定ファイルのシンタックスチェックをします。
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
2018-02-07 20:53:37 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf"
2018-02-07 20:53:37 +0000 [info]: starting fluentd-1.0.2 as dry run mode ruby="2.4.2"
$ echo $?
0
エラーメッセージは表示されません。
flutend の systemd ファイルの変更
flurentd プロセスはデフォルトでは td-agent ユーザー・グループ権限で起動されます。
Apache のログディレクトリの権限変更・グループ操作などを行わない場合、Apache のログ読み込み時に以下のエラーが発生します。
2018-02-07 21:08:02 +0000 [error]: #0 unexpected error error_class=Errno::EACCES error="Permission denied @ rb_sysopen - /var/log/td-agent/httpd-access.pos"
今回は fluentd プロセスをルート権限で実行させます。
/usr/lib/systemd/system/td-agent.service
[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target
[Service]
# User=td-agent
# Group=td-agent
User=root
Group=root
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120
[Install]
WantedBy=multi-user.target
fluentd の起動
fluentd をフォアグラウンド実行してみましょう。
$ sudo td-agent -c /etc/td-agent/td-agent.conf
2018-02-10 12:21:21 +0000 [info]: parsing config file is succeeded path="/etc/td-agent/td-agent.conf"
2018-02-10 12:21:21 +0000 [info]: using configuration file: <ROOT>
<source>
@type tail
format apache2
path "/var/log/httpd/access_log"
pos_file "/var/log/td-agent/httpd-access.pos"
tag "log.httpd.access"
<parse>
@type apache2
</parse>
</source>
<match log.httpd.*>
@type kinesis_streams_aggregated
stream_name "test"
region "ap-northeast-1"
<buffer>
flush_mode interval
retry_type exponential_backoff
</buffer>
</match>
</ROOT>
2018-02-10 12:21:21 +0000 [info]: starting fluentd-1.0.2 pid=3252 ruby="2.4.2"
2018-02-10 12:21:21 +0000 [info]: spawn command to main: cmdline=["/opt/td-agent/embedded/bin/ruby", "-Eascii-8bit:ascii-8bit", "/sbin/td-agent", "-c", "/etc/td-agent/td-agent.conf", "--under-supervisor"]
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '2.4.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kafka' version '0.6.5'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-kinesis' version '2.1.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.0.1'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-s3' version '1.1.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td' version '1.0.0'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.3'
2018-02-10 12:21:21 +0000 [info]: gem 'fluent-plugin-webhdfs' version '1.2.2'
2018-02-10 12:21:21 +0000 [info]: gem 'fluentd' version '1.0.2'
2018-02-10 12:21:21 +0000 [info]: adding match pattern="log.httpd.*" type="kinesis_streams_aggregated"
2018-02-10 12:21:21 +0000 [info]: adding source type="tail"
2018-02-10 12:21:21 +0000 [info]: #0 starting fluentd worker pid=3256 ppid=3252 worker=0
2018-02-10 12:21:21 +0000 [info]: #0 following tail of /var/log/httpd/access_log
2018-02-10 12:21:21 +0000 [info]: #0 fluentd worker is now running worker=0
Apache にリクエストをして、Kinesis Data Stream に送信するデータを生成します。
$ curl localhost
問題が見つからなければ、 fluentd を systemctl
でバックグラウンド実行しましょう。
$ sudo systemctl start td-agent.service
$ sudo systemctl status td-agent.service
● td-agent.service - td-agent: Fluentd based data collector for Treasure Data
Loaded: loaded (/usr/lib/systemd/system/td-agent.service; disabled; vendor preset: disabled)
Active: active (running) since Wed 2018-02-07 21:28:10 UTC; 7s ago
Docs: https://docs.treasuredata.com/articles/td-agent
Process: 3451 ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid (code=exited, status=0/SUCCESS)
Main PID: 3456 (fluentd)
CGroup: /system.slice/td-agent.service
├─3456 /opt/td-agent/embedded/bin/ruby /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid
└─3461 /opt/td-agent/embedded/bin/ruby -Eascii-8bit:ascii-8bit /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-ag...
Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Starting td-agent: Fluentd based data collector for Treasure Data...
Feb 07 21:28:10 ip-XXX.ap-northeast-1.compute.internal systemd[1]: Started td-agent: Fluentd based data collector for Treasure Data.
Lambda 関数の実装
KPL は複数のログを1レコードに集約(aggregate)してKinesisに PutRecord
します。
そのため、Consumer 側は集約されたログを分割(deaggregate)します。
awslabs/kinesis-aggregation で Python Lambda を実装
AWS が GitHub に公開している awslabs/kinesis-aggregation という Lambda 向け KPL の aggregate/deaggregate ライブラリを利用します。
- Python
- Java
- Node.js
と各 Lambda ランタイム向けライブラリが含まれています。
- C#
- Golang
向けはまだ未対応です。
今回は Python のものを利用します。
$ git clone https://github.com/awslabs/kinesis-aggregation.git
$ cd kinesis-aggregation/python/
$ git checkout py3-compat
今回は Python3.6 向けの Lambda 関数を作成します。
WIP な py3-compat
ブランチをチェックアウトします。
移動したディレクトリは、KPL で送信された集約ログを分解し、print するだけの Lambda 関数(lambda_function.py
) がブループリントとして用意されているので、今回はこのファイルを利用します。
lambda_function.py の中身を抜粋
aws_kinesis_agg モジュールには集約されたログを分割する Lambda 向け(悪く言うと、Lambda イベントソース決め打ち)の関数が用意されています。あとは、その関数を呼び出して、各ログを処理するだけです。
- ジェネレーターベース : iter_deaggregate_records
- 非ジェネレーターベース : deaggregate_records
の2種類があります。
実際の Lambda 関数を一部抜粋します。
from aws_kinesis_agg.deaggregator import deaggregate_records, iter_deaggregate_records
import base64
import six
def lambda_bulk_handler(event, context):
"""A Python AWS Lambda function to process Kinesis aggregated
records in a bulk fashion."""
raw_kinesis_records = event['Records']
# Deaggregate all records in one call
user_records = deaggregate_records(raw_kinesis_records)
# Iterate through deaggregated records
for record in user_records:
# Kinesis data in Python Lambdas is base64 encoded
payload = base64.b64decode(record['kinesis']['data'])
six.print_('%s' % payload)
return 'Successfully processed {} records.'.format(len(user_records))
Python2/3 対応のために six ライブラリを利用していることを確認できます。なお、six の由来は 2 x 3 = 6 からです。
Lambda パッケージ(Zip)化
lambda_function.py
を Lambda 関数として Zip ファイルにまとめるにはどうすればよいでしょうか?
直接利用している aws_kinesis_agg モジュールの他に、シリアライズに利用している Google Protobuf モジュールなども必要です。
Protobuf は Lambda パッケージ化の注意点が多いため、シュッと ZIP アーカイブを作成してくれる専用プログラム(make_lambda_build.py
)が存在します。
$ python make_lambda_build.py
...
Successfully created Lambda build zip file: /Users/jsmith/dev/kinesis-aggregation/python/python_lambda_build.zip
lambda_function.py
をカスタマイズして、依存ライブラリを追加した時は、make_lambda_build.py
の上のほうにある定数 PIP_DEPENDENCIES
に依存ライブラリを追加してください。
make_lambda_build.py
#Add your PIP dependencies here
PIP_DEPENDENCIES = ['protobuf']
AWSマネージメントコンソールからLambda 関数を登録
作成したZipファイルを管理画面からアップロードして Lambda 関数を以下の内容で作成します。
- Runtime : Python 3.6
-
Handler : lambda_function.lambda_generator_handler
- Event sources : “test” Kinesis Streams
CLoudWatch Logs から Lambda の実行ログを確認
Lambda 関数内では、分割した各ログデータを print 出力しています。 CloudWatch > Log Groups > Streams for /aws/lambda/YOUR_LAMBDA_FUNCTION_NAME のログストリームに、deaggregate されたログが出力されていることを確認します。
無事 Apache のアクセスログが出力されていれば、EC2(AL2) -> fluentd -> KPL -> Kinesis Data Streams -> Lambda とデータ連携されたことを意味します。
まとめ
今回は fluentd/KPL on Amazon Linux 2 -> Amazon Kinesis Data Streams -> Lambda(Python 3.6) のデータ連携・ログ集約を実装しました。
AL2 が systemd を採用している点を除き、 AL1 時代と同様の手順で構築でき、AL1 から AL2 への移行もスムーズと思われます。
KPL でデータ集約した場合とデータ集約しなかった場合のスループットの違いは、過去のブログ記事を参照下さい
Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる
あとは KPL が Amazon Kinesis Data Firehose に対応してくれると、最高なんですが、、、
Will KPL support writing to kinesis firehose? · Issue #29 · awslabs/amazon-kinesis-producer · GitHub
参考
- python lib is not maintained (no python3 support) · Issue #22 · awslabs/kinesis-aggregation · GitHub
- http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html
- https://github.com/awslabs/amazon-kinesis-producer
- https://github.com/awslabs/aws-fluent-plugin-kinesis
- https://github.com/awslabs/kinesis-aggregation