ちょっと話題の記事

【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデータ変換機能が追加。Apacheログが簡単にJSON形式に変換可能に!

2016.12.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、せーのです。今日はKinesis Firehoseになかなか強力な機能が追加されたのでご紹介します。

これを待っていた

今回ご紹介するのは「データ変換機能」です。所謂ETL(Extract、Transform、Load)処理と言われるもので、流れてくるデータを使いやすい形に加工した上で対象となるデータベースやデータレイクに書き出すものです。

例えばApacheのログデータをS3に蓄積しておいて、後ほど分析に使いたいとします。Apacheのログってこんな感じですよね。

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326

この状態でS3に入っていた場合、分析に使うには一旦このデータをどこかのサーバに取り出し、分析用に加工して、もう一度データを貯め直す必要があります。 Kinesis Firehoseは細かいロジックやエージェント処理を抜きにしてS3やRedshift,Elasticsearch Service等にデータを流し込めるのが特徴です。つまり上のようなデータを流した場合、そのままS3などに入ってしまうので使いにくいですね。

今回ご紹介するデータ変換機能を使えばFirehoseからS3にデータを流し込む際にLambda Functionを間にはさむことが出来ます。ここで来たデータを自由に加工して使いやすい形にした上でS3に流すことができるわけです。これは分析系の仕事をされる方には待ってました!な機能ではないでしょうか。

firehose_transform14

blueprintつき

しかも既にLambda Functionには通常のETL用のblueprintの他によく使われるであろう

  • Apache LogをCSV形式に加工
  • Apache LogをJSON形式に加工
  • SyslogをCSV形式に加工
  • SyslogをJSON形式に加工

の4つの変換blueprintが既に用意されています。Apache LogやSyslogの加工ならほぼワンクリックで出来てしまいます。これはお得! なんか通販番組っぽくなってきました。

やってみた

それでは早速やってみましょう。今回はApacheのログをJSONに変えてS3に保存してみたいと思います。

Kinesis Firehoseの設定

まずFirehoseの設定です。Firehoseはまだ東京リージョンにはきていないのでバージニアリージョンで行います。

firehose_transform1

適当な名前をつけたS3バケット(US Standardのもの)を用意し、Firehoseの送り先に設定します。

firehose_transform2

Nextボタンを押すとLambda Functionの設定欄が追加されています。新規に作るので下の方にある[(Optional) Create a new Lambda function]をクリックします。

firehose_transform3

使用できるBlueprintの一覧が出てきました。[Apache Log to JSON]を選びます。ここからはLambdaの設定になります。

Lambda Functionの設定

[Apache Log to JSON]を選ぶとLambda Functionの設定画面になります。

firehose_transform4

と言ってもコードは既に書かれているのであとは新規にIAM Roleを作成してCreate Functionを押すだけです。

firehose_transform5

なんと、これでLambdaの設定はおしまいです。

ふたたびKinesis Fireose

作成したLambda Functionを選択します。

firehose_transform6

Lambdaのタイムアウトを1分以上にして下さい、という警告が出ています。Lambdaの設定からタイムアウトを変更します。

firehose_transform8

firehose_transform7

あとはこちらも新規にIAM Roleを作成してCreateすればOKです。ちなみにIAM Roleの権限はLambdaの方はCloudwatch Logsへのログ出力の権限、Firehoseの方はS3への出力とLambdaへのInvokeの権限が入っています。いずれも新規作成するとAWS側で自動に選択してくれるのでおまかせしてしまったほうがラクかと思います。

テスト

さて、構築が完了しました。テストしてみましょう。私はAWS IoTに慣れているのでAWS IoTからデータを流してみたいと思います。Rulesを作成します。

firehose_transform10

[test/transform]というTopicにデータがくるとFirehoseに流す、というものです。作成したらMQTT ClientよりApache Common Logのログデータを1行流してみます。

firehose_transform15

少し待ってS3を覗いてみます。

firehose_transform12

データが出てきました。中身を見てみます。

{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"200"}

JSON形式に加工されて出力されていますね。成功です。

ログ形式じゃないときはどうなんでしょう。Firehoseのデモデータを流してみます。

firehose_transform9

しばらく待つと

firehose_transform13

Failデータとして出てきました。

{"attemptsMade":1,"arrivalTimestamp":1482348186046,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1482348551493,"rawData":"eyJUSUNLRVJfU1lNQk9MIjoiU0FDIiwiU0VDVE9SIjoiRU5FUkdZIiwiQ0hBTkdFIjotMC4zNywiUFJJQ0UiOjU1LjJ9","lambdaArn":"arn:aws:lambda:us-east-1:887464593869:function:test-transform:$LATEST"}

Firehoseの中に入っているエンコード前のデータがエラー内容と共に出力されるようです。

まとめ

いかがでしたでしょうか。この機能をうまく使うことでデータの加工、蓄積がより簡単にできますね。是非試してみて下さい!

参考リンク