Javascript(Node.js)版aws-sdkを使って CloudWatchLogs からEventをFilterする

はじめに

「なんかおかしい」と言われてログを漁る、でも特に何も見つからない。ホントに無いのか、探し方が悪いのか、そもそも探してる場所が違うのか、、、疑心暗鬼に駆られる事はありませんか?
cli だと、コマンドの打ち間違いもあるかもしれませんね。
慣れてる人はそうでも無いのかもしれませんが、それぞれシステム事情も有ります。余計な心労は無いにこしたことはないですね。

という事で、プログラムしましょう。今回採用したのはNode.jsです(理由は特にありません)。

CloudWatchLogsからEventをFilterする

aws-cliaws logs filter-log-events に相当する処理を実装します。

よく見かけるサンプル実装

まずはさらっと

// init AWS
// ↓のコードは、~/.aws/credentials にProfileが定義されてる前提
const AWS = require('aws-sdk');
AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: 'your-profile' });
AWS.config.update({region: 'ap-northeast-1'});

// 適当なパラメータ
const params = {
    logGroupName: 'log-group',
    logStreamNames: ['log-stream-1', 'log-stream-2'],
    filterPattern: '"some-filter-value"', // ハイフン等を含む場合は二重に囲む事が必須
    startTime: new Date('2019-10-20T09:00:00+0900').valueOf(),
    endTime: new Date('2019-10-20T18:00:00+0900').valueOf(),
};

const CWLogs = new AWS.CloudWatchLogs();
CWLogs.filterLogEvents(params, (error, data) => {
    if (error)
        console.log("error", error);
    else {
        console.log("events:", data.events.length);
        data.events.forEach(ev => console.log(ev));
        console.log("done!");
    }
});

さてこれで期待した結果が得られましたか?
残念ながら、たいていの条件・状況において、これでは不足です。CloudWatchLogsでは、一度のクエリで走査されるボリュームが制限されているからです。
本番環境のように大量のログデータが存在する状況で、このようなコードで、期待した結果を得られるような事はまずありません(対象範囲日時をよほど絞り込めている場合でも無い限り)。

現実的には、nextTokenを使った再帰的な問合せが必要になります。

nextToken を使って再帰処理する

という事でPromise使って再帰処理を実装します。

// AWSの初期化とかパラメータ等々は省略

const CWLogs = new AWS.CloudWatchLogs();

const processDataChunk = function(data) {
    console.log("chunk-size:", data.events.length);
    data.events.forEach(ev => console.log(ev));
    return Promise.resolve(data);
};

const maxTimesToInquire = 10;
const parseRecursively = function(params, count = 1, nextToken = null) {
    if (count > maxTimesToInquire)
        return Promise.resolve("# reach the limit!");
    console.log(`will inquire: ${count}`);

    if (nextToken)
        params.nextToken = nextToken;

    return CWLogs.filterLogEvents(params)
        .promise()
        .then(processDataChunk)
        .then(data => {
            if (data.nextToken)
                return parseRecursively(params, ++count, data.nextToken);
            else
                return Promise.resolve("# completed!");
        });
};

return parseRecursively(params)
    .then(console.log)
    .catch(console.log);

これで、再帰問合せが実現できました。 問合せの上限回数を緩くすれば、指定日時範囲の全てのログを探索し続ける事も可能です(もちろん自己責任でお願いします)。

使い回せるように整える

やってみただとここまでなんですが、せっかくなので、もう少しちゃんとやりきりたいと思います。
まず filterLogEvents のクエリ条件を、コマンドライン引数から貰う事にします。ここはライブラリ(command-line-args)の力を借りて、スッキリまとめたいですね。defaultValueも定義して、無駄な入力は減らしましょう。
日時情報には定番のmomentmoment-timezoneを使って、JSTデフォルト、短縮入力も可能にします。入力ミスは怖いのでちゃんとValidateも。

'use strict';

const AWS = require('aws-sdk');
const moment = require("moment-timezone");
const commandLineArgs = require('command-line-args');
const convertKeysToCamelCase = require('camelcase-keys');

const CWLOGS_FILTER_COMMAND_OPTIONS = [
    { name: 'log-group-name',   alias: 'g', type: String, },
    { name: 'log-stream-names', alias: 't', type: String, multiple: true },
    { name: 'start-time',       alias: 's', type: str => parseDateArgs(str), },
    { name: 'end-time',         alias: 'e', type: str => parseDateArgs(str), },
    { name: 'filter-pattern',   alias: 'f', type: str => '"' + str + '"', },
    { name: 'limit',            alias: 'l', type: Number, },
];
const COMMAND_LINE_OPTIONS = CWLOGS_FILTER_COMMAND_OPTIONS.concat([
    { name: 'profile',              alias: 'p', type: String, defaultValue: 'default' },
    { name: 'max-times-to-inquire', alias: 'm', type: Number, defaultValue: 10 },
    { name: 'no-debug-log',         alias: 'n', type: Boolean, defaultValue: false },
]);

const parseDateArgs = function (dateStr) {
    const m =  moment.tz(dateStr, 'Asia/Tokyo'); // default = JST
    if (m.isValid()) return m.valueOf();
    else throw new Error(`Invalid date str: '${dateStr}'`);
};

const debug = function() {
    if (!noDebugLog) console.log(...arguments);
};

// parse command line args.
const options = convertKeysToCamelCase(commandLineArgs(COMMAND_LINE_OPTIONS));
const { profile, maxTimesToInquire, noDebugLog, ...queryParams } = options; // split object with key.

// debug params
if (queryParams.startTime) debug("startTime:", moment(queryParams.startTime).format());
if (queryParams.endTime)   debug("endTime:", moment(queryParams.endTime).format());
debug(queryParams);

// init aws
AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: profile });
AWS.config.update({ region: 'ap-northeast-1' });

const CWLogs = new AWS.CloudWatchLogs();
const processDataChunk = function(data) {
    debug("chunk-size:", data.events.length);
    data.events.forEach(e => console.log(e));
    return Promise.resolve(data);
};
const parseRecursively = function(params, count = 1, nextToken = null) {
    if (count > maxTimesToInquire)
        return Promise.resolve("# reach the limit!");
    debug(`recursive-parse: ${count}`);

    if (nextToken)
        params.nextToken = nextToken;

    return CWLogs.filterLogEvents(params)
        .promise()
        .then(processDataChunk)
        .then(data => {
            if (data.nextToken)
                return parseRecursively(params, ++count, data.nextToken);
            else
                return Promise.resolve("# completed!");
        });
};

return parseRecursively(queryParams)
    .then(debug)
    .catch(console.log);

ベースとしてはこんな感じでどうでしょう?
あとは、ログストリーム毎にカスタマイズしたものを用意しておけば、コマンドラインからの入力の手間も省けますし、processDataChunk関数内でログの内容に応じた処理を実装する事で、より見やすく加工したり、後処理を楽にする事もできると思います。

まとめ

という事で、私は上記のようなスクリプトを、ログストリーム毎や、定番の抽出条件などにカスタマイズしたものを複数用意しています。
やるべき事を定型化できるようになり、結果、心労は大きく軽減されました。
同じように、心病みそうな思いをされてる方の助けになれば嬉しいです。