この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部、池田です。
JSONのフラット化でけっこう苦労したのでブログにします。
オブジェクトの配列(後述)が含まれているような、やや複雑なJSONをフラット化します。
↓基本の部分は以下の公式の記事です。
【 AWS Glue でリレーショナル変換後にピボットされたデータを使用するにはどうすればよいですか? 】
対象のJSON
データ構造
↓今回使用するサンプルファイルはこんな感じです。
{"id":"001","name":"池田","profile":{"age":17},"friends":[{"id":"002","name":"山田"},{"id":"003","name":"竹田"}]}
{"id":"004","name":"渡部","profile":{"age":64},"friends":[{"id":"005","name":"渡辺"},{"id":"006","name":"渡邉"}]}
{"id":"007","name":"吉川","profile":{"level":99},"extraField":"foo"}
JSON Lines(改行区切りのJSON)の形式のファイルを使用しています。 (通常のJSONファイルだとAmazon Athenaだと エラー 、AWS Glueでは正常にロードできませんでした。)
↓参考用に、通常のJSONにして整形したもの(一部省略)。
[
{
"id": "001",
"name": "池田",
"profile": {
"age": 17
},
"friends": [
{
"id": "002",
"name": "山田"
},
{
"id": "003",
"name": "竹田"
}
]
},
…
]
friends
部分が「 id
と name
をフィールドとして持つオブジェクト」の配列になっています。
3つめのid=007のレコードだけ、オブジェクト配列フィールドが無かったり、
フィールド名(key)が違ったり、検証用にイレギュラーなレコードにしています。
Glue Data Catalog定義
AWS Glue では、S3のファイルを直接ロードもできますが、 Data Catalog 経由の方法と両方実装してみます。
↓データカタログのテーブル定義はこんな感じ。 ( Amazon Athena からの実行を想定。)
CREATE EXTERNAL TABLE my_catalog.my_json_ext (
id STRING,
name STRING,
profile STRUCT<
age: INT
>,
friends ARRAY<
STRUCT<
id: STRING,
name: STRING
>
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-bucket/input_json/'
;
※ LOCATION
に指定しているバケットに前述のjsonlファイルを置いている想定です。
SerDe は2種類あるようですが、
今回の内容だとOpenX JSON SerDeの方が良さそうです。
余談:Athenaでのフラット化
せっかくデータカタログにテーブルを作成したので、Athenaからも参照してみます。
↓Athenaで実行したSQL。
SELECT id, name,
profile.age,
f.id AS f_id, f.name AS f_name
FROM my_catalog.my_json_ext,
UNNEST(friends) AS t(f)
UNION ALL
SELECT id, name,
profile.age,
NULL, NULL
FROM my_catalog.my_json_ext
WHERE friends IS NULL
;
↓結果。
"id","name","age","f_id","f_name"
"001","池田","17","002","山田"
"001","池田","17","003","竹田"
"004","渡部","64","005","渡辺"
"004","渡部","64","006","渡邉"
"007","吉川",,,
さくっとフラット化できましたね。
おまけのつもりで調べましたが、Athenaで十分ならGlue使わなくても良い気がします。たぶん。
UNIONしないきれいな書き方ができたら良かったのですが…
LEFT JOINしたら良いのかな…
(UNIONの後半のブロックを書かないと、JOINでid=007のレコードが落ちます。)
ちなみに、「SELECT * …」で取得しても、
データカタログに未定義のフィールド(id=007の extraField
や level
)は取得できませんでした。
Glue実装
↓最終的なコードは以下のようになりました。環境は AWS Glue 3.0(Spark 3.1.1, Python 3.7)
です。
クリックでコードを表示する/折りたたむ
import re
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
# パラメータ
DATABASE_NAME = "my_catalog" # データカタログDB名
TABLE_NAME = "my_json_ext" # データカタログテーブル名
# INPUT_BUCKET = "my-bucket" # 入力バケット名
# INPUT_PATH = "input_json/" # 入力バケットパス
OUTPUT_BUCKET = "my-bucket" # 出力バケット名
OUTPUT_PATH = "output_tsv/" # 出力バケットパス
COL_ORDER = ["id", "name", "profile.age", "friends.id", "friends.name"] # 出力カラム名とそれらの順番
OBJ_ARRAY_COL = "friends" # オブジェクト配列のフィールド名
GLUE_TEMP_STORAGE = "s3://my-bucket/tmp/glue_relationalize/"
def main():
glue_context = GlueContext(SparkContext.getOrCreate())
# パラメータ調整
col_order = [re.sub("^{}\\.".format(OBJ_ARRAY_COL), "{}.val.".format(OBJ_ARRAY_COL), i) if OBJ_ARRAY_COL in i else i for i in COL_ORDER] # オブジェクト配列は「.val」でアクセス
col_order = ["`{}`".format(i) if "." in i else i for i in col_order] # 「.」入りの列名はバッククウォート
print("output header: {}".format(col_order))
# A. DynamicFrame作成(Glueデータカタログから)
dyf = glue_context.create_dynamic_frame_from_catalog(
database=DATABASE_NAME, table_name=TABLE_NAME)
print("loading succeed: {}.{}".format(DATABASE_NAME, TABLE_NAME))
# B. DynamicFrame作成(S3のJSONから)
# dyf = glue_context.create_dynamic_frame_from_options(
# "s3", {"paths": ["s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH)]}, "json")
# print("loading succeed: s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH))
print("dynamic_frame row num: {}".format(dyf.count())) # debug
print("dynamic_frame schema:") # debug
dyf.printSchema() # debug
# オブジェクト配列がある場合はフラット化
if OBJ_ARRAY_COL:
dfc = dyf.relationalize("root", GLUE_TEMP_STORAGE)
print("root schema:") # debug
dfc.select("root").printSchema() # debug
# オブジェクト配列部分のdyf
obj_array_df_name = "root_{}".format(OBJ_ARRAY_COL)
print(obj_array_df_name + " schema:") # debug
dfc.select(obj_array_df_name).printSchema() # debug
# a. dyfは内部結合しかできないので、dfに変換して左外部結合する
df_main = dfc.select("root").toDF()
# もとのJSONのフィールドkeyにidがあるとバッティングしてしまうのでリネームしてから結合
df_sub = dfc.select(obj_array_df_name).rename_field("id", "_flatten_id") \
.rename_field("index", "_flatten_index") \
.toDF()
dyf = DynamicFrame.fromDF(df_main.join(df_sub, (df_main["`{}`".format(OBJ_ARRAY_COL)] == df_sub["_flatten_id"]), "left"),
glue_context, "flattened")
# b. オブジェクト配列部分が必須フィールドならdyfだけで変換できる
# dyf = dfc.select("root").join("`{}`".format(OBJ_ARRAY_COL), "_flatten_id",
# dfc.select(obj_array_df_name).rename_field("id", "_flatten_id")
# .rename_field("index", "_flatten_index"))
print("flattened row num: {}".format(dyf.count())) # debug
print("flattened schema:") # debug
dyf.printSchema() # debug
else:
print("OBJ_ARRAY_COL is falsy.")
# カラムの順番を整列
dyf = dyf.select_fields(col_order)
print("ordered schema:") # debug
dyf.printSchema() # debug
# 書き出し(TSV)
dyf.write("s3", {"path": "s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH)},
"csv", {"separator": "\t", "quoteChar": -1, "writeHeader": True})
print("writing succeed: s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH))
if __name__ == "__main__":
main()
詳細
ポイントだけ説明していきます。
dyf = glue_context.create_dynamic_frame_from_catalog(
database=DATABASE_NAME, table_name=TABLE_NAME)
前章で定義したデータカタログから、データをGlueの DynamicFrame にロードしています。
また、データカタログを使わない場合は、↓以下のようにS3のパスを指定して直接ロードできます。
dyf = glue_context.create_dynamic_frame_from_options(
"s3", {"paths": ["s3://{}/{}".format(INPUT_BUCKET, INPUT_PATH)]}, "json")
どちらの方法でロードしても、触った範囲では特に違いはありませんでした。
この時点でのDynamicFrameのスキーマは以下のようになっており、 データカタログ経由でも未定義のフィールドまでロードされていました。
root
|-- id: string
|-- name: string
|-- profile: struct
| |-- age: int
| |-- level: int
|-- friends: array
| |-- element: struct
| | |-- id: string
| | |-- name: string
|-- extraField: string
dfc = dyf.relationalize("root", GLUE_TEMP_STORAGE)
DynamicFrameの Relationalize
機能でフラット化とオブジェクト配列の部分を切り出します。
戻り値は DynamicFrameCollection
で、今回であれば以下のようなDynamicFramが2つ格納されています。
↓本体部分( dfc.select("root")
)。
root
|-- id: string
|-- name: string
|-- profile.age: int
|-- profile.level: int
|-- friends: long
|-- extraField: string
friends
がもう一方のDynamicFrameのidと紐づけられるように数値に変わっています。
同時に profile
部分もフラットになっています。
↓オブジェクト配列の部分( dfc.select("root_friends")
)。
root
|-- id: long
|-- index: int
|-- friends.val.id: string
|-- friends.val.name: string
本体と紐づけられる id
と、同じid内での順序の index
が追加されています。
friends
の要素がフラット化される際に、カラム名に .val
が入っていることに注意が必要です。
ちなみに、relationalize()の第二引数のパス(staging_path)に指定したS3バケットを見てみると、
こちらのDynamicFrameのデータがCSVファイルで保存されていました。
df_main = dfc.select("root").toDF()
# もとのJSONのフィールドkeyにidがあるとバッティングしてしまうのでリネームしてから結合
df_sub = dfc.select(obj_array_df_name).rename_field("id", "_flatten_id") \
.rename_field("index", "_flatten_index") \
.toDF()
dyf = DynamicFrame.fromDF(df_main.join(df_sub, (df_main["`{}`".format(OBJ_ARRAY_COL)] == df_sub["_flatten_id"]), "left"),
glue_context, "flattened")
以下のようにDynamicFrameでもJOINはできるのですが、 DynamicFrameは内部結合しかできないので、今回は上記のように一度 PysparkのDataframe に変換して左外部結合しています。
dyf = dfc.select("root").join("`{}`".format(OBJ_ARRAY_COL), "_flatten_id",
dfc.select(obj_array_df_name).rename_field("id", "_flatten_id")
.rename_field("index", "_flatten_index"))
(↑DynamicFrameで結合する方式だと、「.」を含むフィールド名はバッククォートで括る必要がある場合があるようです。 これに気づけずにかなりハマリました…)
また、オブジェクト配列のDynamicFrameの方について、前述したように、
id
と index
というカラムを自動で作成してくれているのですが、
よくある名前なので結合後にバッティングすることがありそうです…
それを防ぐためにリネームしてから結合しています。
↓JOIN後のDynamicFrameのスキーマはこんな感じ。もうほぼ完成。
root
|-- id: string
|-- name: string
|-- profile.age: int
|-- profile.level: int
|-- friends: long
|-- extraField: string
|-- friends.val.id: string
|-- friends.val.name: string
|-- _flatten_id: long
|-- _flatten_index: int
dyf = dyf.select_fields(col_order)
RelationalizeとJOINで余分なカラムができたのと、 TSVとして出力する場合はカラム順に冪等性を持たせた方が良いと思うので、 調整しています。
↓こんな感じのスキーマ(完成形)。
root
|-- id: string
|-- name: string
|-- profile.age: int
|-- friends.val.id: string
|-- friends.val.name: string
(必要ならこの辺のタイミングでカラム名を変えても良いかもしれません。)
dyf.write("s3", {"path": "s3://{}/{}".format(OUTPUT_BUCKET, OUTPUT_PATH)},
"csv", {"separator": "\t", "quoteChar": -1, "writeHeader": True})
TSVとしてDynamicFrameをS3上のファイルに書き出して終わり!
↓出力結果はこんな感じ。
id name profile.age friends.val.id friends.val.name
001 池田 17 002 山田
001 池田 17 003 竹田
007 吉川
004 渡部 64 005 渡辺
004 渡部 64 006 渡邉
※実際には複数ファイルに出力されたのを、まとめて掲載しています。
おわりに
GlueかAthenaか、データカタログを定義するかしないか、辺りは悩ましいですね…
一周回ってDynamicFrameが好きになってきました。