AWS Glueでオブジェクトの配列があるJSONをフラット化する

AWS GlueのRelationalize機能で、オブジェクトの配列があるJSON Lines(改行区切りのJSON)ファイルをリレーショナル型に変換し、TSVファイルとして出力する。
2021.12.23

データアナリティクス事業本部、池田です。
JSONのフラット化でけっこう苦労したのでブログにします。 オブジェクトの配列(後述)が含まれているような、やや複雑なJSONをフラット化します。

↓基本の部分は以下の公式の記事です。
AWS Glue でリレーショナル変換後にピボットされたデータを使用するにはどうすればよいですか?

対象のJSON

データ構造

↓今回使用するサンプルファイルはこんな感じです。

{"id":"001","name":"池田","profile":{"age":17},"friends":[{"id":"002","name":"山田"},{"id":"003","name":"竹田"}]}
{"id":"004","name":"渡部","profile":{"age":64},"friends":[{"id":"005","name":"渡辺"},{"id":"006","name":"渡邉"}]}
{"id":"007","name":"吉川","profile":{"level":99},"extraField":"foo"}

JSON Lines改行区切りのJSON)の形式のファイルを使用しています。 (通常のJSONファイルだとAmazon Athenaだと エラー 、AWS Glueでは正常にロードできませんでした。)

↓参考用に、通常のJSONにして整形したもの(一部省略)。

[
    {
        "id": "001",
        "name": "池田",
        "profile": {
            "age": 17
        },
        "friends": [
            {
                "id": "002",
                "name": "山田"
            },
            {
                "id": "003",
                "name": "竹田"
            }
        ]
    },
    …
]

friends 部分が「 idname をフィールドとして持つオブジェクト」の配列になっています。
3つめのid=007のレコードだけ、オブジェクト配列フィールドが無かったり、 フィールド名(key)が違ったり、検証用にイレギュラーなレコードにしています。

Glue Data Catalog定義

AWS Glue では、S3のファイルを直接ロードもできますが、 Data Catalog 経由の方法と両方実装してみます。

データカタログのテーブル定義はこんな感じ。 ( Amazon Athena からの実行を想定。)

CREATE EXTERNAL TABLE my_catalog.my_json_ext (
    id STRING,
    name STRING,
    profile STRUCT<
        age: INT
    >,
    friends ARRAY<
        STRUCT<
            id: STRING,
            name: STRING
        >
    >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-bucket/input_json/'
;

LOCATION に指定しているバケットに前述のjsonlファイルを置いている想定です。
SerDe は2種類あるようですが、 今回の内容だとOpenX JSON SerDeの方が良さそうです。

余談:Athenaでのフラット化

せっかくデータカタログにテーブルを作成したので、Athenaからも参照してみます。
↓Athenaで実行したSQL。

SELECT id, name,
       profile.age,
       f.id AS f_id, f.name AS f_name
FROM my_catalog.my_json_ext,
     UNNEST(friends) AS t(f)
UNION ALL
SELECT id, name,
       profile.age,
       NULL, NULL
FROM my_catalog.my_json_ext
WHERE friends IS NULL
;

↓結果。

"id","name","age","f_id","f_name"
"001","池田","17","002","山田"
"001","池田","17","003","竹田"
"004","渡部","64","005","渡辺"
"004","渡部","64","006","渡邉"
"007","吉川",,,

さくっとフラット化できましたね。 おまけのつもりで調べましたが、Athenaで十分ならGlue使わなくても良い気がします。たぶん。
UNIONしないきれいな書き方ができたら良かったのですが… LEFT JOINしたら良いのかな… (UNIONの後半のブロックを書かないと、JOINでid=007のレコードが落ちます。)

ちなみに、「SELECT * …」で取得しても、 データカタログに未定義のフィールド(id=007の extraFieldlevel )は取得できませんでした。

Glue実装

↓最終的なコードは以下のようになりました。環境は AWS Glue 3.0(Spark 3.1.1, Python 3.7) です。

クリックでコードを表示する/折りたたむ
import re
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame

# パラメータ
DATABASE_NAME = "my_catalog"  # データカタログDB名
TABLE_NAME = "my_json_ext"  # データカタログテーブル名
# INPUT_BUCKET = "my-bucket"  # 入力バケット名
# INPUT_PATH = "input_json/"  # 入力バケットパス
OUTPUT_BUCKET = "my-bucket"  # 出力バケット名
OUTPUT_PATH = "output_tsv/"  # 出力バケットパス
COL_ORDER = ["id", "name", "profile.age", "friends.id", "friends.name"]  # 出力カラム名とそれらの順番
OBJ_ARRAY_COL = "friends"  # オブジェクト配列のフィールド名

GLUE_TEMP_STORAGE = "s3://my-bucket/tmp/glue_relationalize/"


def main():
    glue_context = GlueContext(SparkContext.getOrCreate())

    # パラメータ調整
    col_order = [re.sub("^{}\\.".format(OBJ_ARRAY_COL), "{}.val.".format(OBJ_ARRAY_COL), i) if OBJ_ARRAY_COL in i else i for i in COL_ORDER]  # オブジェクト配列は「.val」でアクセス
    col_order = ["`{}`".format(i) if "." in i else i for i in col_order]  # 「.」入りの列名はバッククウォート
    print("output header: {}".format(col_order))

    # A. DynamicFrame作成(Glueデータカタログから)
    dyf = glue_context.create_dynamic_frame_from_catalog(
        database=DATABASE_NAME, table_name=TABLE_NAME)
    print("loading succeed: {}.{}".format(DATABASE_NAME, TABLE_NAME))

    # B. DynamicFrame作成(S3のJSONから)
    # dyf = glue_context.create_dynamic_frame_from_options(
    #     "s3", {"paths": ["s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH)]}, "json")
    # print("loading succeed: s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH))
    print("dynamic_frame row num: {}".format(dyf.count()))  # debug
    print("dynamic_frame schema:")  # debug
    dyf.printSchema()  # debug

    # オブジェクト配列がある場合はフラット化
    if OBJ_ARRAY_COL:
        dfc = dyf.relationalize("root", GLUE_TEMP_STORAGE)
        print("root schema:")  # debug
        dfc.select("root").printSchema()  # debug

        # オブジェクト配列部分のdyf
        obj_array_df_name = "root_{}".format(OBJ_ARRAY_COL)
        print(obj_array_df_name + " schema:")  # debug
        dfc.select(obj_array_df_name).printSchema()  # debug

        # a. dyfは内部結合しかできないので、dfに変換して左外部結合する
        df_main = dfc.select("root").toDF()
        # もとのJSONのフィールドkeyにidがあるとバッティングしてしまうのでリネームしてから結合
        df_sub = dfc.select(obj_array_df_name).rename_field("id", "_flatten_id") \
                                              .rename_field("index", "_flatten_index") \
                                              .toDF()
        dyf = DynamicFrame.fromDF(df_main.join(df_sub, (df_main["`{}`".format(OBJ_ARRAY_COL)] == df_sub["_flatten_id"]), "left"),
                                  glue_context, "flattened")

        # b. オブジェクト配列部分が必須フィールドならdyfだけで変換できる
        # dyf = dfc.select("root").join("`{}`".format(OBJ_ARRAY_COL), "_flatten_id",
        #                               dfc.select(obj_array_df_name).rename_field("id", "_flatten_id")
        #                                                            .rename_field("index", "_flatten_index"))
        print("flattened row num: {}".format(dyf.count()))  # debug
        print("flattened schema:")  # debug
        dyf.printSchema()  # debug
    else:
        print("OBJ_ARRAY_COL is falsy.")

    # カラムの順番を整列
    dyf = dyf.select_fields(col_order)
    print("ordered schema:")  # debug
    dyf.printSchema()  # debug

    # 書き出し(TSV)
    dyf.write("s3", {"path": "s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH)},
              "csv", {"separator": "\t", "quoteChar": -1, "writeHeader": True})

    print("writing succeed: s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH))


if __name__ == "__main__":
    main()

詳細

ポイントだけ説明していきます。


    dyf = glue_context.create_dynamic_frame_from_catalog(
        database=DATABASE_NAME, table_name=TABLE_NAME)

前章で定義したデータカタログから、データをGlueの DynamicFrameロードしています。

また、データカタログを使わない場合は、↓以下のようにS3のパスを指定して直接ロードできます。

    dyf = glue_context.create_dynamic_frame_from_options(
        "s3", {"paths": ["s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH)]}, "json")

どちらの方法でロードしても、触った範囲では特に違いはありませんでした。

この時点でのDynamicFrameのスキーマは以下のようになっており、 データカタログ経由でも未定義のフィールドまでロードされていました。

root
|-- id: string
|-- name: string
|-- profile: struct
|    |-- age: int
|    |-- level: int
|-- friends: array
|    |-- element: struct
|    |    |-- id: string
|    |    |-- name: string
|-- extraField: string

        dfc = dyf.relationalize("root", GLUE_TEMP_STORAGE)

DynamicFrameの Relationalize 機能でフラット化とオブジェクト配列の部分を切り出します
戻り値は DynamicFrameCollection で、今回であれば以下のようなDynamicFramが2つ格納されています。

↓本体部分( dfc.select("root") )。

root
|-- id: string
|-- name: string
|-- profile.age: int
|-- profile.level: int
|-- friends: long
|-- extraField: string

friends がもう一方のDynamicFrameのidと紐づけられるように数値に変わっています。
同時に profile 部分もフラットになっています。

↓オブジェクト配列の部分( dfc.select("root_friends") )。

root
|-- id: long
|-- index: int
|-- friends.val.id: string
|-- friends.val.name: string

本体と紐づけられる id と、同じid内での順序の index が追加されています。
friends の要素がフラット化される際に、カラム名に .val が入っていることに注意が必要です。
ちなみに、relationalize()の第二引数のパス(staging_path)に指定したS3バケットを見てみると、 こちらのDynamicFrameのデータがCSVファイルで保存されていました。


        df_main = dfc.select("root").toDF()
        # もとのJSONのフィールドkeyにidがあるとバッティングしてしまうのでリネームしてから結合
        df_sub = dfc.select(obj_array_df_name).rename_field("id", "_flatten_id") \
                                              .rename_field("index", "_flatten_index") \
                                              .toDF()
        dyf = DynamicFrame.fromDF(df_main.join(df_sub, (df_main["`{}`".format(OBJ_ARRAY_COL)] == df_sub["_flatten_id"]), "left"),
                                  glue_context, "flattened")

以下のようにDynamicFrameでもJOINはできるのですが、 DynamicFrameは内部結合しかできないので、今回は上記のように一度 PysparkのDataframe に変換して左外部結合しています。

        dyf = dfc.select("root").join("`{}`".format(OBJ_ARRAY_COL), "_flatten_id",
                                      dfc.select(obj_array_df_name).rename_field("id", "_flatten_id")
                                                                   .rename_field("index", "_flatten_index"))

(↑DynamicFrameで結合する方式だと、「.」を含むフィールド名はバッククォートで括る必要がある場合があるようです。 これに気づけずにかなりハマリました…)

また、オブジェクト配列のDynamicFrameの方について、前述したように、 idindex というカラムを自動で作成してくれているのですが、 よくある名前なので結合後にバッティングすることがありそうです…
それを防ぐためにリネームしてから結合しています。

↓JOIN後のDynamicFrameのスキーマはこんな感じ。もうほぼ完成。

root
|-- id: string
|-- name: string
|-- profile.age: int
|-- profile.level: int
|-- friends: long
|-- extraField: string
|-- friends.val.id: string
|-- friends.val.name: string
|-- _flatten_id: long
|-- _flatten_index: int

    dyf = dyf.select_fields(col_order)

RelationalizeとJOINで余分なカラムができたのと、 TSVとして出力する場合はカラム順に冪等性を持たせた方が良いと思うので、 調整しています。

↓こんな感じのスキーマ(完成形)。

root
|-- id: string
|-- name: string
|-- profile.age: int
|-- friends.val.id: string
|-- friends.val.name: string

(必要ならこの辺のタイミングでカラム名を変えても良いかもしれません。)


    dyf.write("s3", {"path": "s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH)},
              "csv", {"separator": "\t", "quoteChar": -1, "writeHeader": True})

TSVとしてDynamicFrameをS3上のファイルに書き出して終わり!

↓出力結果はこんな感じ。

id	name	profile.age	friends.val.id	friends.val.name
001	池田	17	002	山田
001	池田	17	003	竹田
007	吉川			
004	渡部	64	005	渡辺
004	渡部	64	006	渡邉

※実際には複数ファイルに出力されたのを、まとめて掲載しています。

おわりに

GlueかAthenaか、データカタログを定義するかしないか、辺りは悩ましいですね…
一周回ってDynamicFrameが好きになってきました。

関連情報/参考にさせていただいたページ