AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する

2018.01.17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできるようになります。

Relationalizeとは

Relationalizeは、AWS Glueが提供するRelationalizeというTransformで、DynamicFrameをリレーショナル(行と列)形式に変換します。データのスキーマに基づいて、ネストした構造化データをフラットな構造化データに変換してDynamicFrameを作成します。出力は、複数の表にデータを書き込むことができる、DynamicFrameのコレクションです。

【レポート】AWS GlueによるサーバレスETL #reinvent #ABD315

サンプル1:ネストされたJSON

変換したいネストしたJSONのデータは以下のとおりです。なお、AWS GlueのSparkは、JSONをminify形式、prettify形式の何れの形式でも変換可能です。

{
"player": {
"username": "user1",
"characteristics": {
"race": "Human",
"class": "Warlock",
"subclass": "Dawnblade",
"power": 300,
"playercountry": "USA"
},
"arsenal": {
"kinetic": {
"name": "Sweet Business",
"type": "Auto Rifle",
"power": 300,
"element": "Kinetic"
},
"energy": {
"name": "MIDA Mini-Tool",
"type": "Submachine Gun",
"power": 300,
"element": "Solar"
},
"power": {
"name": "Play of the Game",
"type": "Grenade Launcher",
"power": 300,
"element": "Arc"
}
},
"armor": {
"head": "Eye of Another World",
"arms": "Philomath Gloves",
"chest": "Philomath Robes",
"leg": "Philomath Boots",
"classitem": "Philomath Bond"
},
"location": {
"map": "Titan",
"waypoint": "The Rig"
}
}
}

サンプル2:フラット化されたJSON

以下のような、フラットなJSONに変換します。最終的には、この形式をCSVやParquetにファイル出力します。

{
"player.username": "user1",
"player.characteristics.race": "Human",
"player.characteristics.class": "Warlock",
"player.characteristics.subclass": "Dawnblade",
"player.characteristics.power": 300,
"player.characteristics.playercountry": "USA",
"player.arsenal.kinetic.name": "Sweet Business",
"player.arsenal.kinetic.type": "Auto Rifle",
"player.arsenal.kinetic.power": 300,
"player.arsenal.kinetic.element": "Kinetic",
"player.arsenal.energy.name": "MIDA Mini-Tool",
"player.arsenal.energy.type": "Submachine Gun",
"player.arsenal.energy.power": 300,
"player.arsenal.energy.element": "Solar",
"player.arsenal.power.name": "Play of the Game",
"player.arsenal.power.type": "Grenade Launcher",
"player.arsenal.power.power": 300,
"player.arsenal.power.element": "Arc",
"player.armor.head": "Eye of Another World",
"player.armor.arms": "Philomath Gloves",
"player.armor.chest": "Philomath Robes",
"player.armor.leg": "Philomath Boots",
"player.armor.classitem": "Philomath Bond",
"player.location.map": "Titan",
"player.location.waypoint": "The Rig"
}

JSONファイルからDynamicFrameを作成する方法

ファイルから直接DynamicFrameを生成する

ネストしていないキーバリュー形式のJSONファイルは、sparkSession.read.json()で、DataFrameに変換できます。しかし、ネストしたJSONファイルは、この方式ではネストしたJSON形式を認識できません。

_datasource0 = spark.read.json('s3://mybucket/players/players.json')
datasource0 = DynamicFrame.fromDF(_datasource0, glueContext, 'datasource0')

GlueデータカタログのテーブルからDynamicFrameを生成する

AWS GlueのコンソールからデータソースにGlueデータカタログのテーブル、データターゲットにS3(JSON)を指定すると、ApplyMapping.apply()にカラムの対応付けしたmappings引数を指定したコードが自動生成されます。

ApplyMapping.apply()でDynamicFrameを生成する

GlueデータカタログのテーブルからDynamicFrameを生成して、ApplyMapping.apply()で、フラットなJSONのDynamicFrameに変換します。フラットなJSONのDynamicFrameに変換するためのルールは、AWS Glueのコンソールのmappings引数で指定したキーと値に従います。mappings引数は、Glueが自動生成していますが、手で書くとなると辛い作業です。

applymapping1 = ApplyMapping.apply(
frame = datasource_df,
mappings = [
("player.username", "string", "`player.username`", "string"),
("player.characteristics.race", "string", "`player.characteristics.race`", "string"),
("player.characteristics.class", "string", "`player.characteristics.class`", "string"),
("player.characteristics.subclass", "string", "`player.characteristics.subclass`", "string"),
("player.characteristics.power", "int", "`player.characteristics.power`", "int"),
("player.characteristics.playercountry", "string", "`player.characteristics.playercountry`", "string"),
("player.arsenal.kinetic.name", "string", "`player.arsenal.kinetic.name`", "string"),
("player.arsenal.kinetic.type", "string", "`player.arsenal.kinetic.type`", "string"),
("player.arsenal.kinetic.power", "int", "`player.arsenal.kinetic.power`", "int"),
("player.arsenal.kinetic.element", "string", "`player.arsenal.kinetic.element`", "string"),
("player.arsenal.energy.name", "string", "`player.arsenal.energy.name`", "string"),
("player.arsenal.energy.type", "string", "`player.arsenal.energy.type`", "string"),
("player.arsenal.energy.power", "int", "`player.arsenal.energy.power`", "int"),
("player.arsenal.energy.element", "string", "`player.arsenal.energy.element`", "string"),
("player.arsenal.power.name", "string", "`player.arsenal.power.name`", "string"),
("player.arsenal.power.type", "string", "`player.arsenal.power.type`", "string"),
("player.arsenal.power.power", "int", "`player.arsenal.power.power`", "int"),
("player.arsenal.power.element", "string", "`player.arsenal.power.element`", "string"),
("player.armor.head", "string", "`player.armor.head`", "string"),
("player.armor.arms", "string", "`player.armor.arms`", "string"),
("player.armor.chest", "string", "`player.armor.chest`", "string"),
("player.armor.leg", "string", "`player.armor.leg`", "string"),
("player.armor.classitem", "string", "`player.armor.classitem`", "string"),
("player.location.map", "string", "`player.location.map`", "string"),
("player.location.waypoint", "string", "`player.location.waypoint`", "string")
],
transformation_ctx = "applymapping1")

Relationalize.apply()でDynamicFrameを生成する

GlueデータカタログのテーブルからDynamicFrameを生成して、Relationalize.apply()で、フラットなJSONのDynamicFrameCollectionに変換します。フラットなJSONのDataFrameに変換するためのルールは、ネストしたJSONの項目をピリオドで区切ったキーと値に従います。

dfc_root_table_name = "root" # default value is "roottable"
relationalize1 = Relationalize.apply(
frame = datasource_df,
staging_path = "s3://mybucket/glue/tmp",
name = dfc_root_table_name,
transformation_ctx = "relationalize1")
result_data = relationalize1.select(dfc_root_table_name)

ApplyMapping.apply()は、入出力のキーの対応を引数で渡す必要がありますが、Relationalize.apply()はネストしたJSONの項目をピリオドで区切った文字をキーにするので、コードが簡潔になりました。このコードは変換の設定が不要なので、ソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに変換する機能を共通化できます。よって、以降ではこのRelationalizeを用いた方法を解説します。

Relationalizeを用いた変換コードの解説

ネストされたJSONからCSVファイルに変換する

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Begin variables to customize with your information
glue_source_database = "default"
glue_source_table = "players"
glue_temp_storage = "s3://mybucket/glue/tmp"
dfc_root_table_name = "root" # default value is "roottable"
# End variables to customize with your information

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# glueContextで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")

# Glueのフレームワークで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する
relationalize1 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "relationalize1")
result_data = relationalize1.select(dfc_root_table_name)

# Write CSV
glue_relationalize_output_s3_path = "s3://mybucket/players_csv"
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "csv", transformation_ctx = "datasink2")

job.commit()

ネストされたJSONからParquetファイルに変換する

上記のコードの29〜31行目を下記に置き換えることで、Parquetフォーマットに変換できます。

# Write Parquet
glue_relationalize_output_s3_path = "s3://mybucket/players_parquet"
datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "parquet", transformation_ctx = "datasink2")

最後に

Relationalizeを用いて、ネストされたJSONファイルをCSVファイルやParquetに変換する方法をご紹介しました。AWS GlueのRelationalizeはソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに簡単に変換できます。JSONファイルに対してより大きなデータを効率よくクエリするには、構造化データに変換することでパフォーマンスやスキャン性能を改善します。さらにスキャンのサイズを小さくすることで、Redshift SpectrumやAthenaにおいては利用費の削減できますので、蓄積データについては変換することをご検討してください。

参考