Amazon Kinesis Firehoseを使ってApacheのログをステータスごとに出し分ける方法を考える
こんにちは、せーのです。今日は先日ご紹介した新機能のアレンジ編です。
イベント毎にファイルを分けたい
先日Firehoseから出力先にデータを送る際にLambdaをはさんでApacheのログを簡単にJSON化する機能をご紹介しました。
【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデータ変換機能が追加。Apacheログが簡単にJSON形式に変換可能に!|クラスメソッドブログ
Firehoseからデータを流す時にLambdaを間にはさんでETL処理することが出来る、しかもApacheやSyslogのデータ加工に関しては既にblueprintが用意されていて何もコードを書かなくても変換してくれる、という分析系の方には涙が溢れて止まらない機能でした。
こちらのブログはSNSで色んなコメントをいただき、概ね「便利そう」というものだったのですが、中にこのようなコメントがありました。
うーん。これだけではイベント毎にファイルを分けて書き出せないなあ…惜しい。イベント毎にストリームを分けるのは大仰だからな-。 / “【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデー…” https://t.co/cVnXjBgoG6
— kazutomo (@kazutomo) 2016年12月22日
なるほど。確かにステータスやIP、パスなどでファイルを分けるような機能はついていませんでした。どうにかして出来ないものでしょうか。考えてみましょう。
考えてみた
Athenaで取り出す
一番わかりやすいのはこの方法ではないでしょうか。S3には全てのログを入れておいてAthenaで欲しいステータスのものを拾い出す、という方法です。
こんな感じでApacheのJSONログをAthenaで拾ってるブログとかないかなー、とググってみると、なんとDevelopers.IOが引っかかりましたw
ApacheのアクセスログをAmazon Athenaを使って分析する|クラスメソッドブログ
もうこのまんまいけそうです。
Lambdaで分岐する
次にLambdaを使ってETLしているのだからそのLambdaを使って分岐してみよう、というものです。
Lambdaですから基本コードでなんとでもなりそうな感じがしますね。やってみましょう。
やってみた
まずは前回の記事と同じようにFirehose, S3, Lambdaを準備します。詳しい構築方法は前回の記事を参考にして下さい。
今回は加工前の生データもS3に残してみることにします。こんなことも設定一つでできる辺りはありがたいですね。
次にデータを流すためのAWS IoTを用意します。
軽くテストしてみます。
ファイルが入っていますね。中身を見てみます。
{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"200"}
JSON化されていますね。成功です。
次にETLをしているLambdaの中身を見てみましょう。
'use strict'; console.log('Loading function'); /* Apache Log format parser */ const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/; exports.handler = (event, context, callback) => { let success = 0; // Number of valid entries found let failure = 0; // Number of invalid entries found /* Process the list of records and transform them */ const output = event.records.map((record) => { const entry = (new Buffer(record.data, 'base64')).toString('utf8'); const match = parser.exec(entry); if (match) { /* Prepare JSON version from Apache log data */ const result = { host: match[1], ident: match[2], authuser: match[3], datetime: match[4], request: match[5], response: match[6], }; const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64'); success++; return { recordId: record.recordId, result: 'Ok', data: payload, }; } else { /* Failed event, notify the error and leave the record intact */ failure++; return { recordId: record.recordId, result: 'ProcessingFailed', data: record.data, }; } }); console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`); callback(null, { records: output }); };
Common Log Formatの形式でマッチングをして、マッチしたらそれぞれの値をオブジェクトに入れ直してJSON.stringify()でJSONとして加工、Kinesis用にbase64に戻してCallbackしているんですね。
ということはこのresult.responseの値で分岐して、欲しいステータスのものが来たら別のファイルに吐き出せば良さそうです。実装してみます。
'use strict'; console.log('Loading function'); /* Apache Log format parser */ const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/; var AWS = require('aws-sdk'); var s3 = new AWS.S3(); var params = { Bucket: 'separate-status', Key: "error/500.log", Body: "" }; exports.handler = (event, context, callback) => { let success = 0; // Number of valid entries found let failure = 0; // Number of invalid entries found /* Process the list of records and transform them */ const output = event.records.map((record) => { const entry = (new Buffer(record.data, 'base64')).toString('utf8'); const match = parser.exec(entry); if (match) { /* Prepare JSON version from Apache log data */ const result = { host: match[1], ident: match[2], authuser: match[3], datetime: match[4], request: match[5], response: match[6], }; const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64'); if (Number(result.response) >= 500){ params.Body = JSON.stringify(result); s3.upload(params, function(err, data) { if (err) { console.log("Error uploading data: ", err); } else { console.log("Successfully uploaded data to error."); } return { recordId: record.recordId, result: 'Ok', data: payload, }; }); } else { success++; return { recordId: record.recordId, result: 'Ok', data: payload, }; } } else { /* Failed event, notify the error and leave the record intact */ failure++; return { recordId: record.recordId, result: 'ProcessingFailed', data: record.data, }; } }); console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`); callback(null, { records: output }); };
LambdaのIAM RoleにS3へのアクセス許可を加えて、もう一度テストしてみます。今度はステータスを501にしてみましょう。
バケットを見てみます。
指定したフォルダができていますね。
新しいファイルが出来上がりました。中身を見てみます。
{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"501"}
501のレスポンスのものだけデータがきていますね。成功です。今回は固定のファイル名にしましたが、本番で使う際にはUnixtimeをつけるなどして上書きされないようにしてみて下さい。
ちなみに元データのファイルものぞいてみます。
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 501 2326
元データがきちんと無加工で入っています。
まとめ
いかがでしたでしょうか。前回のブログのちょい足しレシピをご紹介しました。基本はAthenaで取り出す、どうしてもファイルを分けたい方は条件をLambdaに書き込めば可能です。是非お試しください。