[SQS] FIFOキューを挟んでS3に一覧ログを記録してみました

1 はじめに

CX事業本部の平内(SIN)です。

時系列に記録された情報(ログ)を、まとめて一覧したいというニーズは、結構あると思います。 処理のたびに下記のように情報(ログ)を追記していく一覧ファイルをいきなりS3上で生成するのは、結果整合性の制限から無理があります。

  • 一覧(ログ)ファイルの読み込み
  • 一覧(ログ)ファイルに追記
  • 一覧(ログ)ファイルの保存

本記事は、それでも、S3上で一覧を生成したいと言うことで、SQS(FIFO)を挟んで試したみた記録です。

注意:試した結果は、微妙なので、「それでも!」という方は、読み進めて頂ければ嬉しいです。

2 排他制御の失敗

(1) ログ一覧の生成

最初に、S3上の一覧ファイルにログを追記して保存するLambda関数です。

import * as AWS from 'aws-sdk';

exports.handler = async (event: any) => {
    // ログ保存
    await log(event.message);
}

const bucketName = 'my-bucket-name';
const key = 'log.txt';

// ログ保存
async function log(message: string){
    const s3 = new AWS.S3();
    // S3のログファイルを読み込む
    const data = await s3.getObject( {Bucket: bucketName, Key: key} ).promise();
    let text = data.Body!.toString("utf-8");
    // ログを1行追加
    text += `${getDateTimeString()},${message}\n`;
    // S3へログファイルを保存する
    await s3.putObject({Bucket: bucketName, Key: key, Body: text} ).promise();
}

// 現在時の日付文字列の生成
function getDateTimeString(){
    const dt = new Date();
    const Y = dt.getFullYear();
    const M = ("00" + (dt.getMonth()+1)).slice(-2);
    const D = ("00" + dt.getDate()).slice(-2);
    const h = ("00" + (dt.getHours())).slice(-2);
    const m = ("00" + (dt.getMinutes())).slice(-2);
    const s = ("00" + (dt.getSeconds())).slice(-2);
    const ms = ("000" + (dt.getMilliseconds())).slice(-3);
    const result = `${Y}/${M}/${D} ${h}:${m}:${s}.${ms}`;
    return result;
}

(2) 同期処理

先のLambdaを、下記のようなシェルで実行してみました。

const execSync = require('child_process').execSync;
const profile = 'my-profile';
const functionName = 'log-sample';

const max = 10;

[...Array(max).keys()].forEach( i => {
    const param = {message:i};
    execSync(`aws lambda invoke --invocation-type Event --function-name ${functionName} --payload '${JSON.stringify(param)}' --p ${profile} response.json`);
})

execSyncで同期実行しているため、一応、正常に一覧が生成できました。しかし、高速に実行すると、S3の結果整合性から考えて、決して安全ではないと言えます。

2019/11/02 10:55:24.709,0
2019/11/02 10:55:26.269,1
2019/11/02 10:55:27.382,2
2019/11/02 10:55:28.485,3
2019/11/02 10:55:30.161,4
2019/11/02 10:55:31.577,5
2019/11/02 10:55:32.648,6
2019/11/02 10:55:33.789,7
2019/11/02 10:55:34.977,8
2019/11/02 10:55:36.736,9

(3) 非同期処理

同じLambdaを非同期で実行すると一気に一覧生成は破綻している事が確認できます。

const exec = require('child_process').exec;
const profile = 'my-profile';
const functionName = 'log-sample';

const max = 100;

[...Array(max).keys()].forEach( i => {
    const param = {message:i};
    exec(`aws lambda invoke --invocation-type Event --function-name ${functionName} --payload '${JSON.stringify(param)}' --p ${profile} response.json`);
})

下記の一覧は、省略されていません。100件分のログは、14件となってしまいました。

2019/11/02 10:51:07.621,9
2019/11/02 10:51:16.822,12
2019/11/02 10:51:17.033,20
2019/11/02 10:51:18.685,17
2019/11/02 10:51:24.393,19
2019/11/02 10:51:25.076,2
2019/11/02 10:51:26.442,1
2019/11/02 10:51:26.747,5
2019/11/02 10:51:26.937,44
2019/11/02 10:51:27.045,49
2019/11/02 10:51:27.414,21
2019/11/02 10:51:28.012,36
2019/11/02 10:51:28.317,85
2019/11/02 10:51:28.610,70

3 階層化

「処理」と「ログ記録」のロジックを2階層に分割し、間にSQSを挟みます。

(1) SQS

作成したキューは、FIFOキューです。

(2) プロデューサー

「処理」側は、SQSのプロデューサーとして実装します。

import * as AWS from 'aws-sdk';

exports.handler = async (event: any) => {
    await log(event.message);
}

async function log(message: string) {
    const body = {
        message: message
    }
     
    const account  = process.env.ACCOUNT;
    const region = 'ap-northeast-1';
    const queueName = 'log-sample.fifo';
    const url = `https://sqs.${region}.amazonaws.com/${account}/${queueName}`;
    const deduplicationId = Math.random().toString(32).substring(2); // 重複制御
    const groupId = 'log-sample'; // 同じグループとする
    
    const sqs = new AWS.SQS();
    const params: AWS.SQS.Types.SendMessageRequest = {
        MessageBody: JSON.stringify(body),
        MessageGroupId: groupId,
        MessageDeduplicationId: deduplicationId,
        QueueUrl: url,
    };
    console.log(JSON.stringify(params));
    await sqs.sendMessage(params).promise();
}

(3) コンシューマー

階層化した2つのロジックうち「ログ記録」側は、SQSのコンシューマーとなります。

import * as AWS from 'aws-sdk';

const bucketName = 'my-bucket-name';
const key = 'log.txt';

exports.handler = async (_event: any) => {const sqs = new AWS.SQS();
    
    const account  = process.env.ACCOUNT;
    const region = 'ap-northeast-1';
    const queueName = 'log-sample.fifo';
    const url = `https://sqs.${region}.amazonaws.com/${account}/${queueName}`;

    // S3のログファイルを読み込む
    const s3 = new AWS.S3();
    const s3_data = await s3.getObject( {Bucket: bucketName, Key: key} ).promise();
    let text = s3_data.Body!.toString("utf-8");

    const params: AWS.SQS.ReceiveMessageRequest = {
        MaxNumberOfMessages: 10,
        QueueUrl: url,
        WaitTimeSeconds: 0
    };

    while(true) {
        const data = await sqs.receiveMessage(params).promise();
        if (!data.Messages) {
            break;
        }
        try {
            await Promise.all(data.Messages.map( async message => {
                const body:{message: string} = JSON.parse(message.Body!);
                console.log(body.message);
                
                // ログを1行追加
                text += `${getDateTimeString()},${body.message}\n`;
                // メッセージの削除
                var deleteParams = {
                    QueueUrl: url,
                    ReceiptHandle: message.ReceiptHandle!
                };
                await sqs.deleteMessage(deleteParams).promise();
            }))
        } catch(err) {
            console.log(err);
            break;
        }
    }
    // S3へログファイルを保存する
    await s3.putObject({Bucket: bucketName, Key: key, Body: text} ).promise();
}

function getDateTimeString(){
    // 略
}

4 結果

execで非同期に処理を回してみます。

const exec = require('child_process').exec;
const profile = 'my-profile';
const functionName = 'log-sample';

const max = 100;

[...Array(max).keys()].forEach( i => {
    const param = {message:i};
    exec(`aws lambda invoke --invocation-type Event --function-name ${functionName} --payload '${JSON.stringify(param)}' --p ${profile} response.json`);
})

結果、以下のとおりです。省略されていますが、100件の一覧が確認できました。また、記録は時系列になっている事も確認できています。

2019/11/02 19:19:29.973,4
2019/11/02 19:19:30.053,0
2019/11/02 19:19:30.111,2
2019/11/02 19:19:30.112,1
2019/11/02 19:19:30.113,3
2019/11/02 19:19:30.170,5
2019/11/02 19:19:30.209,6
2019/11/02 19:19:30.229,14
2019/11/02 19:19:30.269,8
2019/11/02 19:19:30.289,9
2019/11/02 19:19:30.891,10
2019/11/02 19:19:30.892,15
2019/11/02 19:19:30.950,11
2019/11/02 19:19:30.951,13
・・・略・・・

5 その他

(1) パーミッション

Lambdaのパーミッションには、触れませんでしたが、プロデューサーには、SQSへの書き込み、コンシューマーには、SQSの読み書きとS3への読み書きの権限追加が必要です。

(2) タイムアウト

コンシューマーは、キューが多いと、処理に時間がかかるので、少しタイムアウト値の設定に注意が必要です。

(3) 排他制御

排他制御のため、コンシューマーは、多重で起動できません。このため、コンシューマーのLambdaは、同実行数を1に設定しています。

(4) トリガー

FIFOキューを使用する場合、キューへの追加をコンシューマーのトリガーに出来ないので、CloudWatch のイベントでルールを設定しています。

下図では、間隔を5分としていますが、S3の結果整合性を念頭に、「1回で処理するキューの数」と「スケジュールの間隔」を注意深く検討する必要があります。要件が許せば、トリガーの間隔は、充分にとる方が安全だと思います。

6 最後に

最後になって、これ言うのも何ですが・・・結局「結果整合性」上での「排他制御」をクリアできていません。S3上で一覧をいきなり生成するのは、正直、悪手です。

ログは、1件1ファイル、一覧は、改めて別に作成しましょう・・・