Amazon Kinesis Firehoseを使ってApacheのログをステータスごとに出し分ける方法を考える

Amazon Kinesis Firehoseを使ってApacheのログをステータスごとに出し分ける方法を考える

Clock Icon2016.12.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、せーのです。今日は先日ご紹介した新機能のアレンジ編です。

イベント毎にファイルを分けたい

先日Firehoseから出力先にデータを送る際にLambdaをはさんでApacheのログを簡単にJSON化する機能をご紹介しました。

【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデータ変換機能が追加。Apacheログが簡単にJSON形式に変換可能に!|クラスメソッドブログ

Firehoseからデータを流す時にLambdaを間にはさんでETL処理することが出来る、しかもApacheやSyslogのデータ加工に関しては既にblueprintが用意されていて何もコードを書かなくても変換してくれる、という分析系の方には涙が溢れて止まらない機能でした。

こちらのブログはSNSで色んなコメントをいただき、概ね「便利そう」というものだったのですが、中にこのようなコメントがありました。

うーん。これだけではイベント毎にファイルを分けて書き出せないなあ…惜しい。イベント毎にストリームを分けるのは大仰だからな-。 / “【新機能】Amazon Kinesis FirehoseにAmazon Lambdaを使ったデー…” https://t.co/cVnXjBgoG6

— kazutomo (@kazutomo) 2016年12月22日

なるほど。確かにステータスやIP、パスなどでファイルを分けるような機能はついていませんでした。どうにかして出来ないものでしょうか。考えてみましょう

考えてみた

Athenaで取り出す

一番わかりやすいのはこの方法ではないでしょうか。S3には全てのログを入れておいてAthenaで欲しいステータスのものを拾い出す、という方法です。

separate-status7

こんな感じでApacheのJSONログをAthenaで拾ってるブログとかないかなー、とググってみると、なんとDevelopers.IOが引っかかりましたw

ApacheのアクセスログをAmazon Athenaを使って分析する|クラスメソッドブログ

もうこのまんまいけそうです。

Lambdaで分岐する

次にLambdaを使ってETLしているのだからそのLambdaを使って分岐してみよう、というものです。

separate-status8

Lambdaですから基本コードでなんとでもなりそうな感じがしますね。やってみましょう。

やってみた

まずは前回の記事と同じようにFirehose, S3, Lambdaを準備します。詳しい構築方法は前回の記事を参考にして下さい。

separate-status1

今回は加工前の生データもS3に残してみることにします。こんなことも設定一つでできる辺りはありがたいですね。

separate-status2

次にデータを流すためのAWS IoTを用意します。

separate-status3

軽くテストしてみます。

separate-status5

ファイルが入っていますね。中身を見てみます。

{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"200"}

JSON化されていますね。成功です。

次にETLをしているLambdaの中身を見てみましょう。

'use strict';
console.log('Loading function');

/* Apache Log format parser */
const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/;

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        const match = parser.exec(entry);
        if (match) {
            /* Prepare JSON version from Apache log data */
            const result = {
                host: match[1],
                ident: match[2],
                authuser: match[3],
                datetime: match[4],
                request: match[5],
                response: match[6],
            };
            const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64');
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        } else {
            /* Failed event, notify the error and leave the record intact */
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });
};

Common Log Formatの形式でマッチングをして、マッチしたらそれぞれの値をオブジェクトに入れ直してJSON.stringify()でJSONとして加工、Kinesis用にbase64に戻してCallbackしているんですね。

ということはこのresult.responseの値で分岐して、欲しいステータスのものが来たら別のファイルに吐き出せば良さそうです。実装してみます。

'use strict';
console.log('Loading function');

/* Apache Log format parser */
const parser = /^([\d.]+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3})/;

var AWS = require('aws-sdk');

var s3 = new AWS.S3();

var params = {
        Bucket: 'separate-status',
        Key:  "error/500.log",
        Body: ""
    };

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        const entry = (new Buffer(record.data, 'base64')).toString('utf8');
        const match = parser.exec(entry);
        if (match) {
            /* Prepare JSON version from Apache log data */
            const result = {
                host: match[1],
                ident: match[2],
                authuser: match[3],
                datetime: match[4],
                request: match[5],
                response: match[6],
            };
            const payload = (new Buffer(JSON.stringify(result), 'utf8')).toString('base64');
            
            if (Number(result.response) >= 500){
                params.Body = JSON.stringify(result);
                s3.upload(params, function(err, data) {
                    if (err) {
                      console.log("Error uploading data: ", err);
                    } else {
                      console.log("Successfully uploaded data to error.");                
                    }
                    return {
                            recordId: record.recordId,
                            result: 'Ok',
                            data: payload,
                        };
                });
            } else {
                success++;
                return {
                    recordId: record.recordId,
                    result: 'Ok',
                    data: payload,
                };
            }
        } else {
            /* Failed event, notify the error and leave the record intact */
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            };
        }
    });
    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });
};

LambdaのIAM RoleにS3へのアクセス許可を加えて、もう一度テストしてみます。今度はステータスを501にしてみましょう。

separate-status9

バケットを見てみます。

separate-status10

指定したフォルダができていますね。

separate-status11

新しいファイルが出来上がりました。中身を見てみます。

{"host":"127.0.0.1","ident":"-","authuser":"frank","datetime":"10/Oct/2000:13:55:36 -0700","request":"GET /apache_pb.gif HTTP/1.0","response":"501"}

501のレスポンスのものだけデータがきていますね。成功です。今回は固定のファイル名にしましたが、本番で使う際にはUnixtimeをつけるなどして上書きされないようにしてみて下さい。

ちなみに元データのファイルものぞいてみます。

separate-status6

127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 501 2326

元データがきちんと無加工で入っています。

まとめ

いかがでしたでしょうか。前回のブログのちょい足しレシピをご紹介しました。基本はAthenaで取り出す、どうしてもファイルを分けたい方は条件をLambdaに書き込めば可能です。是非お試しください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.