この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、せーのです。今日は先日ご紹介した新機能のアレンジ編です。
イベント毎にファイルを分けたい
先日Firehoseから出力先にデータを送る際にLambdaをはさんでApacheのログを簡単にJSON化する機能をご紹介しました。
【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデータ変換機能が追加。Apacheログが簡単にJSON形式に変換可能に!|クラスメソッドブログ
Firehoseからデータを流す時にLambdaを間にはさんでETL処理することが出来る、しかもApacheやSyslogのデータ加工に関しては既にblueprintが用意されていて何もコードを書かなくても変換してくれる、という分析系の方には涙が溢れて止まらない機能でした。
こちらのブログはSNSで色んなコメントをいただき、概ね「便利そう」というものだったのですが、中にこのようなコメントがありました。
うーん。これだけではイベント毎にファイルを分けて書き出せないなあ…惜しい。イベント毎にストリームを分けるのは大仰だからな-。 / “【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデー…” https://t.co/cVnXjBgoG6
— kazutomo (@kazutomo) 2016年12月22日
なるほど。確かにステータスやIP、パスなどでファイルを分けるような機能はついていませんでした。どうにかして出来ないものでしょうか。考えてみましょう。
考えてみた
Athenaで取り出す
一番わかりやすいのはこの方法ではないでしょうか。S3には全てのログを入れておいてAthenaで欲しいステータスのものを拾い出す、という方法です。
こんな感じでApacheのJSONログをAthenaで拾ってるブログとかないかなー、とググってみると、なんとDevelopers.IOが引っかかりましたw
ApacheのアクセスログをAmazon Athenaを使って分析する|クラスメソッドブログ
もうこのまんまいけそうです。
Lambdaで分岐する
次にLambdaを使ってETLしているのだからそのLambdaを使って分岐してみよう、というものです。
Lambdaですから基本コードでなんとでもなりそうな感じがしますね。やってみましょう。
やってみた
まずは前回の記事と同じようにFirehose, S3, Lambdaを準備します。詳しい構築方法は前回の記事を参考にして下さい。
今回は加工前の生データもS3に残してみることにします。こんなことも設定一つでできる辺りはありがたいですね。
次にデータを流すためのAWS IoTを用意します。
軽くテストしてみます。
ファイルが入っていますね。中身を見てみます。
{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"200"}
JSON化されていますね。成功です。
次にETLをしているLambdaの中身を見てみましょう。
'use strict';
console.log('Loading function');
/* Apache Log format parser */
const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/;
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
/* Process the list of records and transform them */
const output = event.records.map((record) => {
const entry = (new Buffer(record.data, 'base64')).toString('utf8');
const match = parser.exec(entry);
if (match) {
/* Prepare JSON version from Apache log data */
const result = {
host: match[1],
ident: match[2],
authuser: match[3],
datetime: match[4],
request: match[5],
response: match[6],
};
const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
} else {
/* Failed event, notify the error and leave the record intact */
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`);
callback(null, { records: output });
};
Common Log Formatの形式でマッチングをして、マッチしたらそれぞれの値をオブジェクトに入れ直してJSON.stringify()でJSONとして加工、Kinesis用にbase64に戻してCallbackしているんですね。
ということはこのresult.responseの値で分岐して、欲しいステータスのものが来たら別のファイルに吐き出せば良さそうです。実装してみます。
'use strict';
console.log('Loading function');
/* Apache Log format parser */
const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/;
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
var params = {
Bucket: 'separate-status',
Key: "error/500.log",
Body: ""
};
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
/* Process the list of records and transform them */
const output = event.records.map((record) => {
const entry = (new Buffer(record.data, 'base64')).toString('utf8');
const match = parser.exec(entry);
if (match) {
/* Prepare JSON version from Apache log data */
const result = {
host: match[1],
ident: match[2],
authuser: match[3],
datetime: match[4],
request: match[5],
response: match[6],
};
const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64');
if (Number(result.response) >= 500){
params.Body = JSON.stringify(result);
s3.upload(params, function(err, data) {
if (err) {
console.log("Error uploading data: ", err);
} else {
console.log("Successfully uploaded data to error.");
}
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
});
} else {
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
}
} else {
/* Failed event, notify the error and leave the record intact */
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`);
callback(null, { records: output });
};
LambdaのIAM RoleにS3へのアクセス許可を加えて、もう一度テストしてみます。今度はステータスを501にしてみましょう。
バケットを見てみます。
指定したフォルダができていますね。
新しいファイルが出来上がりました。中身を見てみます。
{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"501"}
501のレスポンスのものだけデータがきていますね。成功です。今回は固定のファイル名にしましたが、本番で使う際にはUnixtimeをつけるなどして上書きされないようにしてみて下さい。
ちなみに元データのファイルものぞいてみます。
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 501 2326
元データがきちんと無加工で入っています。
まとめ
いかがでしたでしょうか。前回のブログのちょい足しレシピをご紹介しました。基本はAthenaで取り出す、どうしてもファイルを分けたい方は条件をLambdaに書き込めば可能です。是非お試しください。