Kinesis Data Firehoseの「動的パーティショニング機能」でJSONが1行で繋がった形式をパース処理させる

Kinesis Data Firehoseの「動的パーティショニング機能」でJSONが1行で繋がった形式をパース処理させる

Lambda等を使わずにKinesis Data Firehoseの動的パーティショニング機能でパースしてみる
Clock Icon2023.03.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、洲崎です。
Kinesis Data Firehoseから流れるJSONが1行で繋がった形式に対して、動的パーティショニング(Dynamic Partitioning)機能でパース処理させることができたので共有します。

もっと楽にKinesis Data Firehoseの1行で繋がったJSONをパースさせたい

前回、Amazon Connect→Amazon Kinesis Data Firehose→S3の構成で、複数のJSONが1行にまとまってしまうからGlueでパース処理を試してみた記事を上げました。

ただ、Glueの学習コストだったり、AthenaのPartition Projectionを利用しているとそこは別で対策が必要だったり、構成が複雑になっていました。
(Glue自体を学べたのは良かったです)
今回はGlueやLambdaを使わずとも、Kinesis Data Firehoseの動的パーティショニング機能を使うことでパースできたので紹介します。

結論

  • Kinesis Data Firehoseの動的パーティショニングでパース処理を行うことはできる
  • Kinesis Data Firehoseの動的パーティショニングは後から設定することはできず、作成時のみ設定できる(すでにある場合は再作成が必要)
  • 動的パーティショニングキーを設ける必要がある
    • 今回はファイルの先頭に"-"を入れる形で対応

Kinesis Data Firehoseの動的パーティショニング機能

Kinesis Data Firehoseの動的パーティショニング機能は、Kinesis Data Firehose内でストリーミングデータをパーティショニングしてS3に配信する機能です。
JSONのインライン解析や、パーティションキーの設定などFirehose内でETLの処理を行うことができます。

今回はこちらの動的パーティショニング、マルチレコードのディスアグリゲーション、改行の区切り文字を使うことで、パース処理を行うことができました。

Firehoseの設定

動的パーティショニング機能はKinesis Data Firehoseを作成するときのみ設定可能です。
Firehoseを作成する画面で各機能を有効にします。

  • 動的パーティショニング
    • 有効
  • マルチレコードのディスアグリゲーション
    • 有効
    • JSON
  • 改行の区切り文字
    • 有効


動的パーティショニングキー

動的パーティショニングを行う際、動的パーティショニングキーが必要になります。
JSONのインライン解析を有効にし、JQ式を下記とし、ファイル名の先頭に"-"をつける形としました。
(こちらのブログのアイデアを拝借しました)

  • JSONのインライン解析
    • 有効

キー名 JQ式
key1 {dummy: ("-")} | .dummy


S3バケットプレフィックス

S3バケットのプレフィックスは動的パーティショニングを利用しない場合のパーティションと同一にしました。
yyyy/MM/dd/HHの後の動的パーティションキーのkey1/をつけない形で設定することで、余分にパーティションされることなく、ファイル名の頭に"-"が付く形になります。

S3バケットプレフィックス S3バケットエラー出力プレフィックス
contact-trace-records/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/!{timestamp:HH}/!{partitionKeyFromQuery:key1} !{firehose:error-output-type}


テスト

何件かテストコールをかけてみた後に、S3バケットを見てみます。
CTRデータがyyyy/MM/dd/HHのパーティションで格納されており、ファイル名の先頭に"-"がついていました。

ファイルを開くと、改行されている確認がとれました!
(ファイルが長いので加工しています)

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}

補足

今回、動的パーティショニングの設定で何パターンか分けて試してみたので備忘までに記載します。

パターン1 マルチレコードのディスアグリゲーションは「無効」、改行が「有効」の場合

  • マルチレコードのディスアグリゲーション
    • 有効ではありません
  • 改行の区切り文字
    • 有効
  • JSONのインライン解析
    • 有効
  • 2回コールした結果
    • 一番下だけ+1行改行される(失敗)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}
/n
  • 3回コールした結果
    • すべて+1行改行される(失敗)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/333333333}
/n

パターン2 マルチレコードのディスアグリゲーションは「有効」、改行が「無効」の場合

  • マルチレコードのディスアグリゲーション
    • 有効
    • JSON
  • 改行の区切り文字
    • 有効ではありません
  • JSONのインライン解析
    • 有効
  • 2回コールした結果
    • パースされる(成功)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}
  • 3回コールした結果
    • 1行になる(失敗)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/333333333}

パターン3 マルチレコードのディスアグリゲーションは「有効」,改行が「有効」の場合

  • マルチレコードのディスアグリゲーション
    • 有効
    • JSON
  • 改行の区切り文字
    • 有効
  • JSONのインライン解析
    • 有効
  • 2回コールした結果
    • パースされる(成功)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}
  • 3回コールした結果
    • パースされる(成功)
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/333333333}

最後に

Kinesis Data Firehoseの動的パーティショニング機能でパース処理してみました。
LambdaやGlue等使わずにシンプルに実装できるのが良いですね。
あとは今後のアップデート等で、動的パーティショニングキーをつけずにデフォルトでパースしてくれるとありがたいなと思いました。

ではまた!コンサルティング部の洲崎でした。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.