(小ネタ)Amazon Kinesis Data Firehose の Source record transformation用Lambdaサンプル ( Node v8.10.0 )

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

Amazon Kinesis Data Firehoseでは、Source record transformationEnableにすることで、ストリームを流れるデータを保存する前に、Lambdaで編集することが可能です。

Lambdaのコードは、テンプレートが用意されていますが、ここには、まだNode6.10のものしか無いようです。 最近、LambdaでもNode8.10で async/awaitを使えるようになった感じから、ちょっとNode 6が辛いです。

検索もしてみたのですが、まだ、asyncで書かれたサンプルは、ちょっと見つけられなかったので、まったく内容がなくて恐縮なのですが、纏めておくことにしました。

完全に自分用です、すいません。

2 Source record transformation

Source record transformationを有効にして、対象のLambdaを設定します。Nodeのバージョンが8.10と表示されていることを確認できます。

3 ストリームへのデータ送信

ストリームに流すJSONは、仮に下記のようなものをとしました。

{
    test: "hello"
}

AWS CLIからストリーム名を指定して、上のデータを送っているようすです。

$ aws firehose put-record --delivery-stream-name sample-stream --record '{"Data":"{\"text\":\"hello\"}"}'
{
    "RecordId": "ihO/1B/5X9N8i7DSbS6lnrhiiW73FvVj1G9TldkA4En2v6xSfTti39XCVx12rRDmKTQYFZI7v6b4dl72oLM5D+N318lcrusgMJNT3s3LdabgC44Z/KLOGzRvwOHRQ/wlZSaTTQl6hT6kNcymnCvm5Isf1JtcHm+R1QmrJMlgizjU9joJiSruuBM2GeyVR2ifDFmCxJesFmGscyjPt8o6J2ovoyI04Lu7"
}

4 Lambda

Lambdaのサンプルコードです。ストリーム上のJSONデータをパースして編集しています。具体的には、キーtestに文字列を追加しました。

exports.handler = async (event) => {
    // records配列をすべて処理する
    const output = event.records.map(record => {

        // Base64で送られてきた1レコードをデコードする
        const data = Buffer.from(record.data, 'base64').toString('utf8');

        // データはJSONの前提なので、パースする
        let parsedData = JSON.parse(data);

        // JSONデータの編集
        parsedData.text += ' World';

        // 戻り値の生成
        // RecordId : 変更なし
        // result : Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)
        // data: base64でエンコードしたデータ
        return {
            recordId: record.recordId,
            result: 'Ok',
            data: Buffer.from(JSON.stringify(parsedData), 'utf8').toString('base64')
        };
    });
    // records:[ {recordId: string, result: string, data: string}] の形式でKinesisに戻す
    return {records: output}
};

変換後、S3に保存されたデータです。

5 最後に

今回は、Source record transformation用のLambdaのコードを纏めました。

繰り返しますが、完全に自分用の覚書です。ほんとごめんなさい。

6 参考リンク


Kinesis で AWS Lambda を使用する
Amazon Kinesis Data Firehose のデータ変換