(小ネタ)Amazon Kinesis Data Firehose の Source record transformation用Lambdaサンプル ( Node v8.10.0 )

2018.11.15

1 はじめに

Amazon Kinesis Data Firehoseでは、Source record transformationEnableにすることで、ストリームを流れるデータを保存する前に、Lambdaで編集することが可能です。

Lambdaのコードは、テンプレートが用意されていますが、ここには、まだNode6.10のものしか無いようです。 最近、LambdaでもNode8.10で async/awaitを使えるようになった感じから、ちょっとNode 6が辛いです。

検索もしてみたのですが、まだ、asyncで書かれたサンプルは、ちょっと見つけられなかったので、まったく内容がなくて恐縮なのですが、纏めておくことにしました。

完全に自分用です、すいません。

2 Source record transformation

Source record transformationを有効にして、対象のLambdaを設定します。Nodeのバージョンが8.10と表示されていることを確認できます。

3 ストリームへのデータ送信

ストリームに流すJSONは、仮に下記のようなものをとしました。

{
test: "hello"
}

AWS CLIからストリーム名を指定して、上のデータを送っているようすです。

$ aws firehose put-record --delivery-stream-name sample-stream --record '{"Data":"{\"text\":\"hello\"}"}'
{
"RecordId": "ihO/1B/5X9N8i7DSbS6lnrhiiW73FvVj1G9TldkA4En2v6xSfTti39XCVx12rRDmKTQYFZI7v6b4dl72oLM5D+N318lcrusgMJNT3s3LdabgC44Z/KLOGzRvwOHRQ/wlZSaTTQl6hT6kNcymnCvm5Isf1JtcHm+R1QmrJMlgizjU9joJiSruuBM2GeyVR2ifDFmCxJesFmGscyjPt8o6J2ovoyI04Lu7"
}

4 Lambda

Lambdaのサンプルコードです。ストリーム上のJSONデータをパースして編集しています。具体的には、キーtestに文字列を追加しました。

exports.handler = async (event) => {
// records配列をすべて処理する
const output = event.records.map(record => {

// Base64で送られてきた1レコードをデコードする
const data = Buffer.from(record.data, 'base64').toString('utf8');

// データはJSONの前提なので、パースする
let parsedData = JSON.parse(data);

// JSONデータの編集
parsedData.text += ' World';

// 戻り値の生成
// RecordId : 変更なし
// result : Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)
// data: base64でエンコードしたデータ
return {
recordId: record.recordId,
result: 'Ok',
data: Buffer.from(JSON.stringify(parsedData), 'utf8').toString('base64')
};
});
// records:[ {recordId: string, result: string, data: string}] の形式でKinesisに戻す
return {records: output}
};

変換後、S3に保存されたデータです。

5 最後に

今回は、Source record transformation用のLambdaのコードを纏めました。

繰り返しますが、完全に自分用の覚書です。ほんとごめんなさい。

6 参考リンク

Kinesis で AWS Lambda を使用する

Amazon Kinesis Data Firehose のデータ変換