NginxのログをLambda, Kinesis Streams を使って MongoDB Atlus に保存 – ClassmethodサーバーレスAdvent Calendar 2017 #serverless #adventcalendar

こんにちわ。西田@大阪です。このエントリはServerless Advent Calendar 2017 6日目の記事です

この記事ではNginxのログを Kinesis Streamsで収集しLambdaを使ってMongoDB Atlusに保存するところまでやってみます

MongoDB Atlusとは?

MongoDB社が提供している、MongoDB のフルマネージドサービスです

MongoDB as a Service:MongoDB Atlasを試してみた | Developers.IO

公式ページ

MongoDB で出来ること

せっかくなのでMongoDBのお気に入りの機能の一つ、配列のフィールドにインデックスを貼れるMultikey IndexesでHTTP Statusに応じて succeedfailureと検索に使えるタグもつけたいと思います

構成

EC2上に動いてるNginxのログをKinesis Agentを使ってKinesis Streamsに書いていき、Lambdaを使ってMongoDB Atlasにデータを保存していきます

事前準備

MongoDB Atlusにサインアップ

MongoDB as a Service:MongoDB Atlasを試してみた

上記を参考に MongoDB Atlus のにサインアップしクラスタを作成します

今回は無料プランを使いたかったのでAWSのバージニアリージョンに作成しました

Kinesis Streamsをセットアップ

任意の名前とシャード数を設定しKinesis Streamsを作成します

KMSの鍵を作成

MongoDB Atlasへの接続情報を暗号化してLambdaの環境変数に設定するためKMSの鍵を作成します

EC2からKinesis Streamsに送信する準備

EC2上のNginxのログをKinesis Streamsに送信するための準備を行います

EC2のインスタンスプロファイルにKinesis, CloudWatchの適切なIAMを設定します

EC2上のNginxのログフォーマットを設定します。今回は簡単のためにタグ区切りで最低限の情報をログに出力させるようにしました

  • /etc/nginx/nginx.conf
log_format  main  "$msec\t$remote_addr\t$status\t$request\t$http_user_agent";

Kinesis Agent はNginxのタグ区切りで出力されるaccess logをJSONとしてKinesis Streamsに送信するよう設定します

  • /etc/aws-kinesis/agent.json
{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      // 送信する対象のパス
      "filePattern": "/var/log/nginx/access.log*",
      // Kinesis Streams の名前
      "kinesisStream": "mongodb-sample",
      "dataProcessingOptions": [
          {
              "optionName": "SINGLELINE"
          },
          {
              "optionName": "CSVTOJSON",
             // JSONのフィールド名を指定。配列の順番とタブで区切られるログの値と対応させる
              "customFieldNames": [ "time", "remote_addr", "status", "method", "user_agent" ],
             // 区切り文字を指定
             "delimiter": "\\t"
          }
      ],
      "partitionKeyOption": "RANDOM"
    }
  ]
}

Lamdbaを設定する

MongoDB Atlas への接続情報を確認します

クラスタの管理画面から接続に必要なmongodb://から始まる接続文字列とMongo Shellで接続するためのコマンドをメモします

MongoDBのIndexを準備する

タグ検索のためのIndexを準備します

こういった操作がSQLでなくJavaScriptで簡単に操作出来るのもMongoDBの特徴のひとつです

  • mongoコマンドがインストール済みでない場合はインストールする必要があります(下記はMacの例です)
brew install mongodb --with-openssl
  • indexを作成します
mongo "mongodb://cluster0-xxx.mongodb.net:27017,cluster0-xxx.mongodb.net:27017,cluster0-xxx.mongodb.net:27017/test?replicaSet=Cluster0-shard-0" --authenticationDatabase admin --ssl --username mongosample --password 設定したパスワード

Cluster:PRIMARY> db.createCollection('logs')
Cluster:PRIMARY> db.getCollection('logs').createIndex({tags: 1})

Lambda

事前準備

  • Kinesis、CloudWatch、KMSなどのIAMを設定する
  • MONGODB_ATLAS_CLUSTER_URIという名前で環境変数にMongodb Atlasへの接続情報を設定しKMSで暗号化します

ソース

こちらを参考にさせていただきました。

Node.js, AWS Lambda and MongoDB Atlas - Code Tutorial | MongoDB

  • ハンドラー
const MongoClient = require('mongodb').MongoClient;
const AWS = require('aws-sdk');

// 接続情報、DBへのコネクションをキャッシュする変数
let atlasConnectionUri = null;
let cachedDb = null;

exports.handler = (event, context, callback) => {
    var uri = process.env['MONGODB_ATLAS_CLUSTER_URI'];

    // 接続情報が既にあればそれを使い、なければKMSで復号化しキャッシュの変数に代入します
    if (atlasConnectionUri != null) {
        processEvent(event, context, callback);
    } else {
        const kms = new AWS.KMS();
        kms.decrypt({ CiphertextBlob: new Buffer(uri, 'base64') }, (err, data) => {
            if (err) {
                console.log('Decrypt error:', err);
                return callback(err);
            }
            atlasConnectionUri = data.Plaintext.toString('ascii');
            processEvent(event, context, callback);
        });
    }
};
  • イベント処理
function processEvent(event, context, callback) {
    // DBのコネクションを使いまわすためにEvent Loopが残っててもLambdaが終了する設定を行います
    context.callbackWaitsForEmptyEventLoop = false;

    if (cachedDb == null) {
        console.log('=> connecting to database');
        // MongoDBへ接続し、成功すればキャッシュの変数に代入します
        MongoClient.connect(atlasConnectionUri, (err, db) => {
            if (err != null) {
                console.error("an error occurred in connecting to database", err);
                callback(null, JSON.stringify(err));
            }
            console.log('=> database connected');

            cachedDb = db;
            return createDoc(db, event, callback);
        });
    } else {
        // DBのコネクションが存在すればそれを使いまわします
        console.log('Re Use database connection from cache');
        createDoc(cachedDb, event, callback);
    }
}
  • ドキュメントの挿入
function createDoc(db, event, callback) {
    let coll = db.collection('logs');
    // Bulk処理のためのオブジェクトを生成
    let batch = coll.initializeUnorderedBulkOp();

    try {
        event.Records.map((record) => {
            // ログはBase64でエンコードされたJSON形式の文字列で送られてくるのでパースします
            const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
            const obj = JSON.parse(payload);

            // HTTP Statusを見てタグを設定します
            if (!obj.status.startsWith('2')) {
                obj.tags = ['failure'];
            }

            // Bulk Insert するためのキューに登録します
            batch.insert(obj);
        });

        // Bulk Insert で登録します
        batch.execute((err, result) => {
            if (err != null) {
                console.error('an error occured in batch insert');
                callback(null, JSON.stringify(err));
            } else {
                callback(null, "SUCCESS");
            }
        });

        // DBのコネクションを使いまわすため、db.closeは明示的に呼びません
    } catch (err) {
        console.error('error occured', err);
    }
}

最後に

いかかでしたでしょうか?

多機能で柔軟なMongoDBをフルマネージドで使えるので、サーバーレスでDynamoDBでは難しいユースケースでは一考の価値があると思われます。

今回は接続制限などを特に行っていなく、パスワード認証のみでやりましたが、次回 VPC Lambda と VPC Peeringを使ったセキュアな接続もやってみたいと思います。