Kinesis Data FirehoseのJSONがつながった形式のレコードをJSON Lines形式に変換して出力する(Node.js)

2021.12.08

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、Amazon Kinesis Data FirehoseのJSONが一行につながった形式のレコードをJSON Lines形式に変換して出力する方法を確認してみました。

Delivery Streamの既定の出力形式ではAthenaでうまくクエリできない

Amazon Kinesis Data FirehoseのDelivery Streamを使用すると、Kinesis Data Streamなどから入力された連続的なストリームデータを1つのオブジェクトにまとめてS3バケットなどに出力することができます。

その際に、Delivery Streamの既定の出力形式は下記のようにJSONデータが1行につながった形式となります。

既定の形式

{"CHANGE":1.15,"PRICE":22.5,"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY"}{"CHANGE":2.07,"PRICE":72.58,"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL"}{"CHANGE":8.79,"PRICE":97.89,"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY"}{"CHANGE":-2.35,"PRICE":173.81,"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY"}

このデータが記載されたオブジェクトがS3に出力されるのですが、Amazon Athenaによるクエリではうまくパースすることができません。先頭の1つ目のJSONのレコードのみクエリ結果として取得できます。

そのためS3にレコードはちゃんと出力できているはずなのにAthenaでクエリしてみたけど何だかレコード数が少ない、ということが起こりハマったりします。またS3 Selectでクエリした場合は1行のJSONであってもすべてのレコードをパースできるという仕様も混乱にさらなる拍車を掛けます。

そこで上記の形式を下記のようなJSON Lines形式(JSONレコードごとに改行された形式)に変換すれば、Athenaで上手くクエリが可能です。

Athenaでうまくクエリ可能な形式

{"CHANGE":1.15,"PRICE":22.5,"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY"}
{"CHANGE":2.07,"PRICE":72.58,"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL"}
{"CHANGE":8.79,"PRICE":97.89,"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY"}
{"CHANGE":-2.35,"PRICE":173.81,"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY"}

この形式であればAthenaのクエリエンジンがデータのパースに使用しているHive JSON SerDeに対応した形式となるためです。

The Hive JSON SerDe is commonly used to process JSON data like events. These events are represented as single-line strings of JSON-encoded text separated by a new line. The Hive JSON SerDe does not allow duplicate keys in map or struct key names.

そこで今回は、Delivery StreamのJSONが1行につながった形式のレコードを、Athenaで読み取り可能なJSON Lines形式に変換して出力する構成を作成してみます。

やってみた

マネジメントコンソールから次のような構成を作成してみます。

Lambda関数の作成

まずJSON形式のレコードをJSON Lines形式に変換するLambda関数を作成します。

マネジメントコンソールのLambda関数の作成ページで、[Runtime]でNode.js 14.xを選択し、[Create function]をクリックします。

以下のNode.jsのコードをindex.jsに指定します。レコードのJSONの末尾に改行コード(\n)を連結するスクリプトです。

index.js

exports.handler = (event, context, callback) => {
  const output = event.records.map((record) => {
    
    //レコードのJSONの末尾に改行コード(\n)を連結する
    let entry = (new Buffer(record.data, 'base64')).toString('utf8');
    let result = entry + '\n'
    const payload = (new Buffer(result, 'utf8')).toString('base64');
        
      return {
          recordId: record.recordId,
          result: 'Ok',
          data: payload,
        };
  });
  console.log(`Processing completed.  Successful records ${output.length}.`);
  callback(null, { records: output });
};

[Deploy]をクリックして関数をデプロイします。

[Configuration]タブ-[General configuration]で、[Edit]をクリックします。

[Timeout]を余裕を見て1分を指定して、[Save]をクリックして保存します。

保存できました。

Delivery Streamの作成

次にDelivery Streamを作成します。

マネジメントコンソールのDelivery streamsのページで、[Create delivery stream]をクリックします。

Create a delivery stream画面が開きます。[Choose source and destination]で[Source]で今回は入力レコードを手動で投入するのでDirect PUTを選択し、[Destination]でAmazon S3を選択します。

[Transform and convert records]-[Transform source records with AWS Lambda]で、[Data transformation]でEnabledを選択し、[Browse]をクリックします。

先程作成したLambda関数を選択して、[Choose]をクリックします。

Lambda関数を指定できました。

[Destination settings]で、[S3 bucket]で出力先のS3バケットを指定し、[S3 bucket prefix]と[S3 bucket error output prefix]で出力先プレフィクスを指定します。

[Create delivery stream]をクリックしてDelivery Streamの作成します。

作成できました。

動作確認

Delivery Streamに対してテストデータを送信します。

[Start sending demo data]をクリックして送信を開始します。

数秒ほど送信し続けたら[Stop sending demo data]をクリックして送信を停止します。

数分経つと対象のバケットのプレフィクスにオブジェクトが作成されました。

オブジェクトをダウンロードして開いてみると、JSONオブジェクト間に改行が挿入され、JSON Linesとなっています。

{"CHANGE":1.15,"PRICE":22.5,"TICKER_SYMBOL":"MMB","SECTOR":"ENERGY"}
{"CHANGE":2.07,"PRICE":72.58,"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL"}
{"CHANGE":8.79,"PRICE":97.89,"TICKER_SYMBOL":"ALY","SECTOR":"ENERGY"}
{"CHANGE":-2.35,"PRICE":173.81,"TICKER_SYMBOL":"BNM","SECTOR":"TECHNOLOGY"}

注意点

Lambda関数のindex.jsの出力形式は次の記述の仕様に則っている必要があります。

例えば出力のresultフィールドを省略すると、The result field cannot be nullというエラーとなります。

index.js

      return {
          recordId: record.recordId,
          data: payload,
        };

またresultフィールドの値がOKのような不正な値だとCheck your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailedというエラーとなります。

index.js

      return {
          recordId: record.recordId,
          result: 'OK',
          data: payload,
        };

参考

以上