AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する
はじめに
AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできるようになります。
Relationalizeとは
Relationalizeは、AWS Glueが提供するRelationalizeというTransformで、DynamicFrameをリレーショナル(行と列)形式に変換します。データのスキーマに基づいて、ネストした構造化データをフラットな構造化データに変換してDynamicFrameを作成します。出力は、複数の表にデータを書き込むことができる、DynamicFrameのコレクションです。
サンプル1:ネストされたJSON
変換したいネストしたJSONのデータは以下のとおりです。なお、AWS GlueのSparkは、JSONをminify形式、prettify形式の何れの形式でも変換可能です。
{ "player": { "username": "user1", "characteristics": { "race": "Human", "class": "Warlock", "subclass": "Dawnblade", "power": 300, "playercountry": "USA" }, "arsenal": { "kinetic": { "name": "Sweet Business", "type": "Auto Rifle", "power": 300, "element": "Kinetic" }, "energy": { "name": "MIDA Mini-Tool", "type": "Submachine Gun", "power": 300, "element": "Solar" }, "power": { "name": "Play of the Game", "type": "Grenade Launcher", "power": 300, "element": "Arc" } }, "armor": { "head": "Eye of Another World", "arms": "Philomath Gloves", "chest": "Philomath Robes", "leg": "Philomath Boots", "classitem": "Philomath Bond" }, "location": { "map": "Titan", "waypoint": "The Rig" } } }
サンプル2:フラット化されたJSON
以下のような、フラットなJSONに変換します。最終的には、この形式をCSVやParquetにファイル出力します。
{ "player.username": "user1", "player.characteristics.race": "Human", "player.characteristics.class": "Warlock", "player.characteristics.subclass": "Dawnblade", "player.characteristics.power": 300, "player.characteristics.playercountry": "USA", "player.arsenal.kinetic.name": "Sweet Business", "player.arsenal.kinetic.type": "Auto Rifle", "player.arsenal.kinetic.power": 300, "player.arsenal.kinetic.element": "Kinetic", "player.arsenal.energy.name": "MIDA Mini-Tool", "player.arsenal.energy.type": "Submachine Gun", "player.arsenal.energy.power": 300, "player.arsenal.energy.element": "Solar", "player.arsenal.power.name": "Play of the Game", "player.arsenal.power.type": "Grenade Launcher", "player.arsenal.power.power": 300, "player.arsenal.power.element": "Arc", "player.armor.head": "Eye of Another World", "player.armor.arms": "Philomath Gloves", "player.armor.chest": "Philomath Robes", "player.armor.leg": "Philomath Boots", "player.armor.classitem": "Philomath Bond", "player.location.map": "Titan", "player.location.waypoint": "The Rig" }
JSONファイルからDynamicFrameを作成する方法
ファイルから直接DynamicFrameを生成する
ネストしていないキーバリュー形式のJSONファイルは、sparkSession.read.json()
で、DataFrameに変換できます。しかし、ネストしたJSONファイルは、この方式ではネストしたJSON形式を認識できません。
_datasource0 = spark.read.json('s3://mybucket/players/players.json') datasource0 = DynamicFrame.fromDF(_datasource0, glueContext, 'datasource0')
GlueデータカタログのテーブルからDynamicFrameを生成する
AWS GlueのコンソールからデータソースにGlueデータカタログのテーブル、データターゲットにS3(JSON)を指定すると、ApplyMapping.apply()にカラムの対応付けしたmappings引数を指定したコードが自動生成されます。
ApplyMapping.apply()でDynamicFrameを生成する
GlueデータカタログのテーブルからDynamicFrameを生成して、ApplyMapping.apply()で、フラットなJSONのDynamicFrameに変換します。フラットなJSONのDynamicFrameに変換するためのルールは、AWS Glueのコンソールのmappings引数で指定したキーと値に従います。mappings引数は、Glueが自動生成していますが、手で書くとなると辛い作業です。
applymapping1 = ApplyMapping.apply( frame = datasource_df, mappings = [ ("player.username", "string", "`player.username`", "string"), ("player.characteristics.race", "string", "`player.characteristics.race`", "string"), ("player.characteristics.class", "string", "`player.characteristics.class`", "string"), ("player.characteristics.subclass", "string", "`player.characteristics.subclass`", "string"), ("player.characteristics.power", "int", "`player.characteristics.power`", "int"), ("player.characteristics.playercountry", "string", "`player.characteristics.playercountry`", "string"), ("player.arsenal.kinetic.name", "string", "`player.arsenal.kinetic.name`", "string"), ("player.arsenal.kinetic.type", "string", "`player.arsenal.kinetic.type`", "string"), ("player.arsenal.kinetic.power", "int", "`player.arsenal.kinetic.power`", "int"), ("player.arsenal.kinetic.element", "string", "`player.arsenal.kinetic.element`", "string"), ("player.arsenal.energy.name", "string", "`player.arsenal.energy.name`", "string"), ("player.arsenal.energy.type", "string", "`player.arsenal.energy.type`", "string"), ("player.arsenal.energy.power", "int", "`player.arsenal.energy.power`", "int"), ("player.arsenal.energy.element", "string", "`player.arsenal.energy.element`", "string"), ("player.arsenal.power.name", "string", "`player.arsenal.power.name`", "string"), ("player.arsenal.power.type", "string", "`player.arsenal.power.type`", "string"), ("player.arsenal.power.power", "int", "`player.arsenal.power.power`", "int"), ("player.arsenal.power.element", "string", "`player.arsenal.power.element`", "string"), ("player.armor.head", "string", "`player.armor.head`", "string"), ("player.armor.arms", "string", "`player.armor.arms`", "string"), ("player.armor.chest", "string", "`player.armor.chest`", "string"), ("player.armor.leg", "string", "`player.armor.leg`", "string"), ("player.armor.classitem", "string", "`player.armor.classitem`", "string"), ("player.location.map", "string", "`player.location.map`", "string"), ("player.location.waypoint", "string", "`player.location.waypoint`", "string") ], transformation_ctx = "applymapping1")
Relationalize.apply()でDynamicFrameを生成する
GlueデータカタログのテーブルからDynamicFrameを生成して、Relationalize.apply()で、フラットなJSONのDynamicFrameCollectionに変換します。フラットなJSONのDataFrameに変換するためのルールは、ネストしたJSONの項目をピリオドで区切ったキーと値に従います。
dfc_root_table_name = "root" # default value is "roottable" relationalize1 = Relationalize.apply( frame = datasource_df, staging_path = "s3://mybucket/glue/tmp", name = dfc_root_table_name, transformation_ctx = "relationalize1") result_data = relationalize1.select(dfc_root_table_name)
ApplyMapping.apply()は、入出力のキーの対応を引数で渡す必要がありますが、Relationalize.apply()はネストしたJSONの項目をピリオドで区切った文字をキーにするので、コードが簡潔になりました。このコードは変換の設定が不要なので、ソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに変換する機能を共通化できます。よって、以降ではこのRelationalizeを用いた方法を解説します。
Relationalizeを用いた変換コードの解説
ネストされたJSONからCSVファイルに変換する
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # Begin variables to customize with your information glue_source_database = "default" glue_source_table = "players" glue_temp_storage = "s3://mybucket/glue/tmp" dfc_root_table_name = "root" # default value is "roottable" # End variables to customize with your information args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # glueContextで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0") # Glueのフレームワークで、Glueデータカタログのテーブルからデータをロードして、DynamicFrameを生成する relationalize1 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "relationalize1") result_data = relationalize1.select(dfc_root_table_name) # Write CSV glue_relationalize_output_s3_path = "s3://mybucket/players_csv" datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "csv", transformation_ctx = "datasink2") job.commit()
ネストされたJSONからParquetファイルに変換する
上記のコードの29〜31行目を下記に置き換えることで、Parquetフォーマットに変換できます。
# Write Parquet glue_relationalize_output_s3_path = "s3://mybucket/players_parquet" datasink2 = glueContext.write_dynamic_frame.from_options(frame = result_data, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "parquet", transformation_ctx = "datasink2")
最後に
Relationalizeを用いて、ネストされたJSONファイルをCSVファイルやParquetに変換する方法をご紹介しました。AWS GlueのRelationalizeはソースデータとターゲットデータを切り変えるだけで、ネストされたJSONデータをキーバリューのJSONデータに簡単に変換できます。JSONファイルに対してより大きなデータを効率よくクエリするには、構造化データに変換することでパフォーマンスやスキャン性能を改善します。さらにスキャンのサイズを小さくすることで、Redshift SpectrumやAthenaにおいては利用費の削減できますので、蓄積データについては変換することをご検討してください。