この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
Amazon Kinesis Data Firehoseでは、Source record transformationをEnableにすることで、ストリームを流れるデータを保存する前に、Lambdaで編集することが可能です。
Lambdaのコードは、テンプレートが用意されていますが、ここには、まだNode6.10のものしか無いようです。 最近、LambdaでもNode8.10で async/awaitを使えるようになった感じから、ちょっとNode 6が辛いです。
検索もしてみたのですが、まだ、asyncで書かれたサンプルは、ちょっと見つけられなかったので、まったく内容がなくて恐縮なのですが、纏めておくことにしました。
完全に自分用です、すいません。
2 Source record transformation
Source record transformationを有効にして、対象のLambdaを設定します。Nodeのバージョンが8.10と表示されていることを確認できます。
3 ストリームへのデータ送信
ストリームに流すJSONは、仮に下記のようなものをとしました。
{
test: "hello"
}
AWS CLIからストリーム名を指定して、上のデータを送っているようすです。
$ aws firehose put-record --delivery-stream-name sample-stream --record '{"Data":"{\"text\":\"hello\"}"}'
{
"RecordId": "ihO/1B/5X9N8i7DSbS6lnrhiiW73FvVj1G9TldkA4En2v6xSfTti39XCVx12rRDmKTQYFZI7v6b4dl72oLM5D+N318lcrusgMJNT3s3LdabgC44Z/KLOGzRvwOHRQ/wlZSaTTQl6hT6kNcymnCvm5Isf1JtcHm+R1QmrJMlgizjU9joJiSruuBM2GeyVR2ifDFmCxJesFmGscyjPt8o6J2ovoyI04Lu7"
}
4 Lambda
Lambdaのサンプルコードです。ストリーム上のJSONデータをパースして編集しています。具体的には、キーtestに文字列を追加しました。
exports.handler = async (event) => {
// records配列をすべて処理する
const output = event.records.map(record => {
// Base64で送られてきた1レコードをデコードする
const data = Buffer.from(record.data, 'base64').toString('utf8');
// データはJSONの前提なので、パースする
let parsedData = JSON.parse(data);
// JSONデータの編集
parsedData.text += ' World';
// 戻り値の生成
// RecordId : 変更なし
// result : Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)
// data: base64でエンコードしたデータ
return {
recordId: record.recordId,
result: 'Ok',
data: Buffer.from(JSON.stringify(parsedData), 'utf8').toString('base64')
};
});
// records:[ {recordId: string, result: string, data: string}] の形式でKinesisに戻す
return {records: output}
};
変換後、S3に保存されたデータです。
5 最後に
今回は、Source record transformation用のLambdaのコードを纏めました。
繰り返しますが、完全に自分用の覚書です。ほんとごめんなさい。