[Kinesis Data Streams] ストリームを挟んでS3に一覧ログを記録してみました

1 はじめに

CX事業本部の平内(SIN)です。

処理ごとに蓄積される「一覧(ログ)ファイルを、リアルタイムにS3上に作成する」というニーズに対して、昨日、下記のようなブログを書きました。

これに対して、社内から、このような要件なら、「FIFOなSQSより、KinesisでSequenceで管理した方がLambdaも入れやすいし使い勝手は良さそうな気がします。」という貴重なアドバイスを頂いたので、やってみました。

注意:今回も、S3の「結果整合性」の制約下で完全な「排他制御」が出来たわけではありませんので・・・「それでも!」という方は、読み進めて頂ければ嬉しいです。

2 Kinesis Data Streams

Kinesis Data Streamsを log-sample-stream という名前で作成しました。

ストリームを処理するタスクの同時実行を制限するために、シャードを1に設定しています。ストリームの容量は、シャードの数によって決まるため、今回は、下記の読み書きの制限内で使用可能という事になります。

3 プロデューサー

プロデューサー側のコードは、以下のとおりです。こころなしか、SQSに送るより簡単になっている気がします。

import * as AWS from 'aws-sdk';

exports.handler = async (event: any) => {
    await log(event.message);
}

async function log(message: string) {
    const body = {
        message: message
    }

    const kinesis = new AWS.Kinesis();
    const streamName = 'log-sample-stream'; 
    const partitionKey = "log-sample";

    const params = {
        Data: JSON.stringify(body),
        PartitionKey: partitionKey,
        StreamName: streamName,
    };
    await kinesis.putRecord(params).promise();
}

4 コンシューマー

コンシューマー側は、次のとおりです。 Kinesisのコンシューマーは、ストリームをトリガーにできるため、パラメーターで渡されるレコードを処理しているだけです。こちらも、明らかにSQSのキューを処理するより簡単です。

import * as AWS from 'aws-sdk';

const bucketName = 'my-bucket-name';
const key = 'log.txt';

exports.handler = async (event: any) => {

    console.log(JSON.stringify(event));

    try{
        // S3のログファイルを読み込む
        const s3 = new AWS.S3();
        const s3_data = await s3.getObject( {Bucket: bucketName, Key: key} ).promise();
        let text = s3_data.Body!.toString("utf-8");

        await Promise.all(event.Records.map( (record:any) => {
            const data = Buffer.from(record.kinesis.data as string, 'base64').toString();
            console.log('data:', data);
            const body: {message:string} = JSON.parse(data);
            // ログを1行追加
            text += `${getDateTimeString()},${body.message}\n`;

        }))

        // S3へログファイルを保存する
        await s3.putObject({Bucket: bucketName, Key: key, Body: text} ).promise();
    } catch(err){
        console.log(`ERROR: ${JSON.stringify(event)}`)
    }
}

function getDateTimeString(){
    const dt = new Date();
    const Y = dt.getFullYear();
    const M = ("00" + (dt.getMonth()+1)).slice(-2);
    const D = ("00" + dt.getDate()).slice(-2);
    const h = ("00" + (dt.getHours())).slice(-2);
    const m = ("00" + (dt.getMinutes())).slice(-2);
    const s = ("00" + (dt.getSeconds())).slice(-2);
    const ms = ("000" + (dt.getMilliseconds())).slice(-3);
    const result = `${Y}/${M}/${D} ${h}:${m}:${s}.${ms}`;
    return result;
}

Kinesisをトリガーに設定しているようすです。

5 非同期処理

前回と同様に、非同期でプロデューサーを実行してみました。

const exec = require('child_process').exec;
const profile = 'my-profile';
const functionName = 'log-sample';

const max = 100;

[...Array(max).keys()].forEach( i => {
    const param = {message:i};
    exec(`aws lambda invoke --invocation-type Event --function-name ${functionName} --payload '${JSON.stringify(param)}' --p ${profile} response.json`);
})

コンシューマーのログを見ると、以下のように、複数回に分けて起動されていることが確認できました。

14件
5件
13件
6件
...略

予想外(?)に、一覧ログは、欠けること無く100件全部取得できていました。

2019/11/03 22:24:19.223,0
2019/11/03 22:24:19.281,6
2019/11/03 22:24:19.282,1
2019/11/03 22:24:19.282,12
2019/11/03 22:24:19.282,7
2019/11/03 22:24:19.282,2
2019/11/03 22:24:19.282,3
2019/11/03 22:24:19.282,8
2019/11/03 22:24:19.282,11
2019/11/03 22:24:19.301,13
2019/11/03 22:24:19.301,17
2019/11/03 22:24:19.301,9
2019/11/03 22:24:19.302,16
...略

6 最後に

結果は、うまく行きましたが、予想外(?)と書いたのは、懸念していた「結果整合性」の問題をクリアしていないからです。

シャードを1に設定することで、コンシューマーの同時実行の問題はクリアできていますが、S3上の一覧を更新した後、次の更新時に、その内容が最新であるという保証はありません。

どれだけの時間を開けると最新を問題無く取得できるのかは分かりませんが、上記の結果から予想すると、コンシューマーで一覧を更新した後、一定時間ウエイトを置いてからLambdaを終了するようにすれば、次回起動を遅らせられるので、有効なのかも知れません。

ここまで来てなんですが・・・個人的な感想です。「結果整合性と戦ってはいけない」

頂いたアドバイスでは、「順序や結果整合性を必要とするデータ、拡張性とかの要求レベル次第ですが、素直にDBに格納してしまうほうが楽だと思います。」という話も頂いたのでここで紹介させて頂きます。

追伸:Kinesis Data Streamsは、使って無くても課金対象なので、検証が終わったら削除しときます。