Kinesis Data FirehoseのJSONがつながった形式のレコードをJSON Lines形式に変換して出力する(Node.js)

2021.12.08

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、Amazon Kinesis Data FirehoseのJSONがつながった形式のレコードをJSON Lines形式に変換して出力する方法を確認してみました。

Delivery Streamの既定の出力形式ではAthenaでうまくクエリできない

Amazon Kinesis Data FirehoseのDelivery Streamを使用すると、Kinesis Data Streamなどから入力された連続的なストリームデータを1つのオブジェクトにまとめてS3バケットなどに出力することができます。

その際に、Delivery Streamの既定の出力形式は下記のようにJSONデータが1行につながった形式となります。

{"CHANGE":1.15,"PRICE":22.5,"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY"}{"CHANGE":2.07,"PRICE":72.58,"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL"}{"CHANGE":8.79,"PRICE":97.89,"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY"}{"CHANGE":-2.35,"PRICE":173.81,"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY"}

このデータが記載されたオブジェクトがS3に出力されるのですが、Amazon Athenaによるクエリではうまくパースすることができません。先頭1つ目のJSONのレコードのみクエリ結果として取得できます。

そのためS3にレコードはちゃんと出力できているはずなのにAthenaでクエリしてみたけど何だかレコード数が少ない、ということが起こりハマったりします。またS3 Selectでクエリした場合は1行のJSONであってもすべてのレコードをパースできるという仕様も混乱にさらなる拍車を掛けます。

そこで今回は、Delivery StreamのJSONが1行につながった形式のレコードを、Athenaで読み取り可能なJSON Lines形式に変換して出力する構成を作成してみます。

やってみた

マネジメントコンソールから次のような構成を作成してみます。

Lambda関数の作成

まずJSON形式のレコードをJSON Lines形式に変換するLambda関数を作成します。

マネジメントコンソールのLambda関数の作成ページで、[Runtime]でNode.js 14.xを選択し、[Create function]をクリックします。

以下のNode.jsのコードをindex.jsに指定します。

index.js

exports.handler = (event, context, callback) => {
  const output = event.records.map((record) => {
    
    //レコードのJSONの末尾に改行コード(\n)を連結する
    let entry = (new Buffer(record.data, 'base64')).toString('utf8');
    let result = entry + '\n'
    const payload = (new Buffer(result, 'utf8')).toString('base64');
        
      return {
          recordId: record.recordId,
          result: 'Ok',
          data: payload,
        };
  });
  console.log(`Processing completed.  Successful records ${output.length}.`);
  callback(null, { records: output });
};

[Deploy]をクリックして関数をデプロイします。

[Configuration]タブ-[General configuration]で、[Edit]をクリックします。

[Timeout]を余裕を見て1分を指定して、[Save]をクリックして保存します。

保存できました。

Delivery Streamの作成

次にDelivery Streamを作成します。

マネジメントコンソールのDelivery streamsのページで、[Create delivery stream]をクリックします。

Create a delivery stream画面が開きます。[Choose source and destination]で[Source]で今回は入力レコードを手動で投入するのでDirect PUTを選択し、[Destination]でAmazon S3を選択します。

[Transform and convert records]-[Transform source records with AWS Lambda]で、[Data transformation]でEnabledを選択し、[Browse]をクリックします。

先程作成したLambda関数を選択して、[Choose]をクリックします。

Lambda関数を指定できました。

[Destination settings]で、[S3 bucket]で出力先のS3バケットを指定し、[S3 bucket prefix]と[S3 bucket error output prefix]で出力先プレフィクスを指定します。

[Create delivery stream]をクリックしてDelivery Streamの作成します。

作成できました。

動作確認

Delivery Streamに対してテストデータを送信します。

[Start sending demo data]をクリックして送信を開始します。

数秒ほど送信し続けたら[Stop sending demo data]をクリックして送信を停止します。

数分経つと対象のバケットのプレフィクスにオブジェクトが作成されました。

オブジェクトをダウンロードして開いてみると、JSONオブジェクト間に改行が挿入され、JSON Linesとなっています。

{"CHANGE":1.15,"PRICE":22.5,"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY"}
{"CHANGE":2.07,"PRICE":72.58,"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL"}
{"CHANGE":8.79,"PRICE":97.89,"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY"}
{"CHANGE":-2.35,"PRICE":173.81,"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY"}

参考

以上