(小ネタ)Amazon Kinesis Data Firehose の Source record transformation用Lambdaサンプル ( Node v8.10.0 )
1 はじめに
Amazon Kinesis Data Firehoseでは、Source record transformationをEnableにすることで、ストリームを流れるデータを保存する前に、Lambdaで編集することが可能です。
Lambdaのコードは、テンプレートが用意されていますが、ここには、まだNode6.10のものしか無いようです。 最近、LambdaでもNode8.10で async/awaitを使えるようになった感じから、ちょっとNode 6が辛いです。
検索もしてみたのですが、まだ、asyncで書かれたサンプルは、ちょっと見つけられなかったので、まったく内容がなくて恐縮なのですが、纏めておくことにしました。
完全に自分用です、すいません。
2 Source record transformation
Source record transformationを有効にして、対象のLambdaを設定します。Nodeのバージョンが8.10と表示されていることを確認できます。
3 ストリームへのデータ送信
ストリームに流すJSONは、仮に下記のようなものをとしました。
{ test: "hello" }
AWS CLIからストリーム名を指定して、上のデータを送っているようすです。
$ aws firehose put-record --delivery-stream-name sample-stream --record '{"Data":"{\"text\":\"hello\"}"}' { "RecordId": "ihO/1B/5X9N8i7DSbS6lnrhiiW73FvVj1G9TldkA4En2v6xSfTti39XCVx12rRDmKTQYFZI7v6b4dl72oLM5D+N318lcrusgMJNT3s3LdabgC44Z/KLOGzRvwOHRQ/wlZSaTTQl6hT6kNcymnCvm5Isf1JtcHm+R1QmrJMlgizjU9joJiSruuBM2GeyVR2ifDFmCxJesFmGscyjPt8o6J2ovoyI04Lu7" }
4 Lambda
Lambdaのサンプルコードです。ストリーム上のJSONデータをパースして編集しています。具体的には、キーtestに文字列を追加しました。
exports.handler = async (event) => { // records配列をすべて処理する const output = event.records.map(record => { // Base64で送られてきた1レコードをデコードする const data = Buffer.from(record.data, 'base64').toString('utf8'); // データはJSONの前提なので、パースする let parsedData = JSON.parse(data); // JSONデータの編集 parsedData.text += ' World'; // 戻り値の生成 // RecordId : 変更なし // result : Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった) // data: base64でエンコードしたデータ return { recordId: record.recordId, result: 'Ok', data: Buffer.from(JSON.stringify(parsedData), 'utf8').toString('base64') }; }); // records:[ {recordId: string, result: string, data: string}] の形式でKinesisに戻す return {records: output} };
変換後、S3に保存されたデータです。
5 最後に
今回は、Source record transformation用のLambdaのコードを纏めました。
繰り返しますが、完全に自分用の覚書です。ほんとごめんなさい。