Kinesis Data FirehoseのJSONが1行で繋がった形式をAWS Glueジョブでパース処理出来ないか検証してみた

AWS Glueをコネコネしてみる
2023.03.27

こんにちは、洲崎です。
JSONの中身をパース処理させる為に、AWS Glueジョブで色々調査したので共有します。

複数のJSONが1行で繋がっていてAthenaで処理出来ない

Amazon Connect→Amazon Kinesis Data Firehose→S3でAmazon ConnectのCTR(問い合わせレコードデータモデル)を出力しています。
CTRデータをQuickSight(Athena)から見た時にレコード数が少ない事が判明しました。
原因調査を進めると、Amazon Kinesis Data FirehoseからAmazon S3にCTRデータを出力する際に、複数のJSONが1行に繋がっており、Athenaが処理できていないことが判明しました。
※複数のJSONが1行にまとまる件はAmazon Kinesis Data FirehoseのDelivery Streamの仕様が原因の為、Amazon Connectに限らず、下記記事のように他のサービスでも起こりうる話になります。

以下、CTRの場合の例をあげます。(CTRはデータが沢山あるので、分かりやすくAgentARNまでで省略します)
パースしていない(1行に繋がっている)場合

既定の形式

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}

パースしている(改行されている)場合

Athenaでクエリ可能な形式

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}

Athenaはパースしている(改行されている)場合にクエリ可能です。
打開策として、Kinesis Data FirehoseやS3にプットされたタイミングでパース処理を行うLambdaを実装することが上げられますが、Lambda以外で実装できる方法はないか考え、タイトルで上げたGlueジョブでできないかを調査しました。

結論

  • Glueジョブを入れることでパース処理を行うことはできる
  • AthenaのPartition Projectionを利用している場合、Glueはそれを認識することができない
    • ALTER TABLE ADD PARTITIONで別でパーティション化をすることが必要
  • JSON → Glue(DynamicFrame) → JSONで変換を行う為、必ずしも同じデータになっているかは確認が必要

構成

構成は2種類考えられます。
1つ目はデータソース・ターゲット共にS3の場合です。
Kinesis Data Firehoseで出力されたS3バケットのデータに対して、Glueジョブで変換し、新しいS3バケットに出力します。
データソース:S3、ターゲット:S3

2つ目はデータソース:Glueテーブル、ターゲットS3の場合です。
Kinesis Data Firehoseで出力されたS3バケットをGlueテーブルのソース元としてセットし新しいS3バケットに出力します。
出力先をパーティション化して格納したい場合はこちらを利用します。
データソース:Glueテーブル、ターゲット:S3

やってみる

Glueジョブのデータソース・ターゲット共にS3の場合

データソース・ターゲット共にS3の場合です。
データソースのバケットにはctrデータを格納し、ターゲットは空の状態にしておきます。

  • データソース:test-s3-source20230327
    • ctr-sample


ctr-sampleはパースされていない1行の形式です。

ctr-sample

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}
  • ターゲット:test-s3-target20230327
    • オブジェクトなし


バケットの準備ができたら、マネジメントコンソールからGlueを開きます。
左のサイドメニューの「ETL jobs」をクリックします。

Jobsの画面になるので、SourceとTargetが「Amazon S3」を指定していることを確認し、右上の「Create」をクリックします。

GUIでデータ変換を指定する画面が開きます。
Data sourceのS3のところで、S3 URLをs3://test-s3-source20230327、Data formatをJSONで指定します。

TransformはApplyMappingのままです。GlueはDynamicFrameというAWS Glueで独自のデータ表現ができ、各項目のデータタイプを変換することも可能です。
今回はデータタイプは変換しない為、デフォルトのままで進めます。(デフォルトのまま進めても自動でパース処理を行ってくれます)

Data targetで、FormatをJSON、S3 URLをs3://test-s3-target20230327で指定します。

あとは右上のSaveを押してから、Runをクリックします。

Runを押した後は、左のメニューのMonitoringから対象のジョブを確認することができます。

処理が完了して、ターゲットのS3バケットを見ると、新しいファイルが生成されています。

中身をみると、パースされていることが確認できました!

run-xxxxxxxxxxx-part-r-00000

{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/111111111}
{"AWSAccountId":"xxxxxxxxxxx","AWSContactTraceRecordFormatVersion":"2017-03-10","AgentARN":"arn:aws:connect:ap-northeast-1:xxxxxxxxx:instance/222222222}

データソースがGlueテーブル・ターゲットがS3の場合

GlueジョブのデータソースがGlueテーブル・ターゲットがS3の場合です。
出力先をパーティション化して格納したい場合に有効です。

まずはCREATE EXTERNAL TABLEでGlueテーブルを作成します。
今回は下記のテーブル定義をAthenaのテキストエディタに貼り付けて、クエリ実行で作成しました。

CREATE EXTERNAL TABLE `ctr20230327`(
  `awsaccountid` string COMMENT 'from deserializer', 
  `awscontacttracerecordformatversion` string COMMENT 'from deserializer', 
  `agentarn` string COMMENT 'from deserializer')
PARTITIONED BY ( 
  `year` int, 
  `month` int, 
  `day` int)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='AWSAccountId,AWSContactTraceRecordFormatVersion,AgentARN') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://test-s3-source20230327/contact-trace-records'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='ctr', 
  'averageRecordSize'='70', 
  'classification'='json', 
  'compressionType'='none', 
  'objectCount'='1', 
  'recordCount'='4', 
  'sizeKey'='282', 
  'transient_lastDdlTime'='1679576383', 
  'typeOfData'='file')

Athenaのクエリエディタからテーブル定義をみると、year,month,dayがパーティション化(AthenaのPartition Projection)されています。

テーブル定義で作成した形で、データソースのS3バケットも下記階層で整えました。
test-s3-source20230327/contact-trace-records/2023/03/27/ctr-sample

ただ、この準備だとGlueはAthenaのPartition Projectionを読むことができないので、別の形でGlueテーブルをパーティション化させておく必要があります。
今回は下記で紹介されているALTER TABLE ADD PARTITIONを利用します。

記事の中にあるcreate_partitions.pyの下記値を修正し、CloudShellにアップロードしpython3 create_partitions.pyで実行します。

dbname = 'default'
table_name = 'ctr20230327'
result_location = 's3://aws-athena-query-results-xxxxxxxxxxx-ap-northeast-1'
bucket = 's3://test-s3-source20230327/contact-trace-records/'
start = datetime(2023, 3, 1)
end = datetime(2023, 3, 31)

下記のように結果が表示されれば成功です。

SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=1) location 's3://test-s3-source20230327/contact-trace-records/2023/03/01'
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=2) location 's3://test-s3-source20230327/contact-trace-records/2023/03/02'
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=3) location 's3://test-s3-source20230327/contact-trace-records/2023/03/03'
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=4) location 's3://test-s3-source20230327/contact-trace-records/2023/03/04'
....
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=28) location 's3://test-s3-source20230327/contact-trace-records/2023/03/28'
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=29) location 's3://test-s3-source20230327/contact-trace-records/2023/03/29'
SUCCEEDED: ALTER TABLE ctr20230327 ADD IF NOT EXISTS PARTITION (year=2023,month=3,day=30) location 's3://test-s3-source20230327/contact-trace-records/2023/03/30'

ここまでできたら、マネジメントコンソールでGlueのジョブ画面を開きます。
Data sourceはData Catalog tableを選択し、DatabaseとTableを指定します。

ApplyMappingはData Catalog tableでテーブル定義されている値を確認することができます。今回もデフォルトのままで進めます。

Data targetではJSONとS3 Target LocationでS3のURLを入力し、Partitionでyear,month,dayをそれぞれ選択します。

結果、test-s3-target20230327/contact-trace-records/year=2023/month=3/day=27/を通って、パース処理をしたデータを格納することができました!

注意点

2点目のパーティション化の格納はAthenaのPartition Projectionを利用しているわけではなく、ALTER TABLE ADD PARTITIONでパーティション化されている為、既存の運用に合うかは別途検証が必要です。
また、結論であげた通り、JSON → Glue(DynamicFrame) → JSONで変換を行う為、必ずしも同じデータになっているかは確認が必要です。

最後に

今回はLambdaを使わずに、Glueでパース処理できないか試してみました。
少し複雑化しているのと考慮する点もあるので、Lambdaを書いた方がスムーズな方はLambdaを書いた方が早いし運用もしやすいかもです。
ただGlueを使ってやってみたい、コードを書かずに実装してみたい方がいましたら参考にしてみてください。

ではまた!コンサルティング部の洲崎でした。

参考