[Glueジョブ] Glueデータカタログ経由でCSVを読み込んで日付型にキャストする方法

2022.08.18

S3にあるCSVファイルをParquetとして出力したいという要件がありました。 CSVがただのテキストファイルであるのに対して、 Parquetはカラムごとに型の情報を持つので、各カラムには目的の型でデータを持たせたいです (使うときは型キャストしてから使ってね、というのはものすごくマヌケ...)。 今回はCSVファイルの容量が大きいことと、 Parquetに出力する前にちょっとした変換処理も入れる必要があったため、 Glueジョブを使ってこの変換をやってみました。

CSVデータを読んで型情報をつけたい、という用途であれば、 Glueデータカタログを利用するという方法が思い当たります。 Glueジョブではcreate_dynamic_frame_from_catalogというメソッドを使うことで、 Glueデータカタログを経由してDynamicFrameにデータを読み込むことができますので、 今回はこのメソッドを使ってみました。

...さて、実際にこの方法で上記の処理を実装して見ました。 割と簡単にできたように思ったのですが、出力したParquetの中身を見てみると、 Glueデータカタログでdate型やtimestamp型などと指定したカラムにもかかわらず、 それらがstring型になっていることに気づきました。 Parquetにする以上は型情報をきちんと付与したいので、これを行う方法を探って見ました。

事象の確認

改めて、何が問題かを確認してみます。

サンプルファイル

以下のようなサンプルを用意してS3に配置します。

sample.csv

sutoringu,seisu,hiduke,jikoku,seisu_small,seisubig
aaa,123,2022-06-23,2020-05-08 17:56:00,123,123

hidukejikokuのデータは、 Athenaにおいては(LazySimpleSerDeを使用した時に)想定通りの型キャストが成功するフォーマットになっています。 Athenaでキャストされる様子については以下のブログをご参照ください。

[Athena] OpenCSVSerDe使用時のデータ型の挙動をまとめてみた

データカタログ定義

Glueデータカタログを定義します。 Athenaで以下のクエリを実行します。

create_datacatalog_ng.sql

CREATE TABLE catalog_lazy_simple_serde(
  sutoringu string,
  seisu int,
  hiduke date,
  jikoku timestamp,
  seisu_small smallint,
  seisu_big bigint
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
LOCATION 's3://bucket-name/catalog/simple/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
);

hidukeはdate型、jikokuはtimestamp型など、目的の型を指定しています。

GlueジョブでDynamicFrameに読んでみる

GlueジョブでDynamicFrameに読み込んでみます。 (キャスト処理を含むコード全文は後ほど掲載しますので、ここではコードのイメージだけ示します)

dynamic_frame = create_dynamic_frame(glue_context, database_name, table_name)
print(f"[INFO] {dynamic_frame.count()}件をDynamicFrameに読み込みました")
print("[INFO] dynamic_frame.printSchema():")
dynamic_frame.printSchema()
dynamic_frame.show()
data_frame.write.mode("overwrite").format("parquet").option(
    "compression", "snappy"
).save("s3://bucket-name/catalog/simple-output/")

実行した時のログは以下のようになりました。

[INFO] 1件をDynamicFrameに読み込みました
[INFO] dynamic_frame.printSchema():
root
|-- sutoringu: string
|-- seisu: int
|-- hiduke: string
|-- jikoku: string
|-- seisu_small: string
|-- seisu_big: long

{
    "sutoringu": "aaa",
    "seisu": 123,
    "hiduke": "2022-06-23",
    "jikoku": "2020-05-08 17:56:00",
    "seisu_small": "123",
    "seisu_big": 123
}

seisuseisu_big以外がstring型になってしまっています。。。

ダメ元で吐き出されたParquetファイルを確認してみます。

$ parquet-tools schema --detail part-00000-d0936b9f-7fa1-4f46-941b-6a4bf8055d61-c000.snappy.parquet
(要点のみ)
message spark_schema {
  optional binary sutoringu (STRING);
  optional int32 seisu;
  optional binary hiduke (STRING);
  optional binary jikoku (STRING);
  optional binary seisu_small (STRING);
  optional int64 seisu_big;
}

やはりSTRINGとなっています。。。

Athenaでは期待通りのキャストが出来たのに、 create_dynamic_frame_from_catalogで読んだ場合には勝手が違うようです。

PySparkでキャストしてみる

色々と試行錯誤してみたのですが、 残念ながらお手軽に目的のデータ型として読み込む方法は見つかりませんでした。

仕方ないので、PySparkの処理で愚直にキャストをして行きます。 今回、少なくとも、DynamicFrameからDataFrameに変換してからキャストすることで実現ができることが確認できました。 DynamicFrameのままでも同様にできるかもしれませんが、今回は未検証です。

Glueデータカタログ定義

データカタログの定義です。 Athenaで以下のクエリを流して作成しました。

create_datacatalog_ok

CREATE TABLE catalog_lazy_simple_serde_string(
  sutoringu string,
  seisu int,
  hiduke string,
  jikoku string,
  seisu_small string,
  seisu_big bigint
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
LOCATION 's3://bucket-name/catalog/simple/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.count'='1'
);

ポイントとして、今度はintbigint以外のカラムは全てstringで定義しています。 なんでなのかはわからないのですが、実際こうしないと失敗することがありました。

キャストするGlueスクリプト

次のようなスクリプトでキャストができました。

convert_to_parquet_with_casted_columns.py

import datetime
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.types import ShortType, TimestampType, DateType
from pyspark.sql.functions import col, lit, udf

def create_dynamic_frame(glue_context, database_name, table_name):
    return glue_context.create_dynamic_frame_from_catalog(
        database=database_name,
        table_name=table_name,
    )

def transform_df(df):
    def _cast_date(date_str, date_format):
        if date_str is not None and date_str != "":
            return datetime.datetime.strptime(date_str, date_format).date()

    udf_date = udf(_cast_date, DateType())

    def _cast_ts(ts_str, ts_format):
        if ts_str is not None and ts_str != "":
            return datetime.datetime.strptime(ts_str, ts_format)

    udf_ts = udf(_cast_ts, TimestampType())

    # 日付のキャスト
    df = df.withColumn("hiduke", udf_date("hiduke", lit("%Y-%m-%d")))
    # 日時のキャスト
    df = df.withColumn("jikoku", udf_ts("jikoku", lit("%Y-%m-%d %H:%M:%S")))
    # smallintのキャスト
    df = df.withColumn("seisu_small", col("seisu_small").cast(ShortType()))
    return df

def main():
    glue_context = GlueContext(SparkContext.getOrCreate())
    database_name = "cm_hirano_spectrum"
    table_name = "catalog_lazy_simple_serde_string"

    print(f"[INFO] database_name: {database_name}")
    print(f"[INFO] table_name: {table_name}")

    dynamic_frame = create_dynamic_frame(glue_context, database_name, table_name)
    print(f"[INFO] {dynamic_frame.count()}件をDynamicFrameに読み込みました")
    print("[INFO] dynamic_frame.printSchema():")
    dynamic_frame.printSchema()
    dynamic_frame.show()
    data_frame = dynamic_frame.toDF()
    data_frame = transform_df(data_frame)
    data_frame.printSchema()
    data_frame.show()
    data_frame.write.mode("overwrite").format("parquet").option(
        "compression", "snappy"
    ).save("s3://bucket-name/catalog/simple-output/")

if __name__ == "__main__":
    main()

ポイントとしては以下です。

  • create_dynamic_frame_from_catalogでDynamicFrame読んで、すぐにDataFrameに変換している
  • date型とtimestamp型についてはキャストするコードをUDFに書いている
    • UDFの中は普通にdatetimeパッケージを使ってキャストしている
    • UDF呼び出しの第二引数にはPySparkのtypesクラスを渡している
  • smallintへのキャストはShortTypeを指定するだけでUDFは不要でした

実行ログ

[INFO] 1件をDynamicFrameに読み込みました
[INFO] dynamic_frame.printSchema():
root
|-- sutoringu: string
|-- seisu: int
|-- hiduke: string
|-- jikoku: string
|-- seisu_small: string
|-- seisu_big: long

{
    "sutoringu": "aaa",
    "seisu": 123,
    "hiduke": "2022-06-23",
    "jikoku": "2020-05-08 17:56:00",
    "seisu_small": "123",
    "seisu_big": 123
}

root
 |-- sutoringu: string (nullable = true)
 |-- seisu: integer (nullable = true)
 |-- hiduke: date (nullable = true)
 |-- jikoku: timestamp (nullable = true)
 |-- seisu_small: short (nullable = true)
 |-- seisu_big: long (nullable = true)

+---------+-----+----------+-------------------+-----------+---------+
|sutoringu|seisu|    hiduke|             jikoku|seisu_small|seisu_big|
+---------+-----+----------+-------------------+-----------+---------+
|      aaa|  123|2022-06-23|2020-05-08 17:56:00|        123|      123|
+---------+-----+----------+-------------------+-----------+---------+

キャスト後はdatetimestampという型になっていますね!

出力されたparquetを見てみます。

$ parquet-tools schema --detail part-00000-b23f3edf-7774-48b4-bca4-d671a47ba9b9-c000.snappy.parquet
message spark_schema {
  optional binary sutoringu (STRING);
  optional int32 seisu;
  optional int32 hiduke (DATE);
  optional int96 jikoku;
  optional int32 seisu_small (INTEGER(16,true));
  optional int64 seisu_big;
}

ちゃんと型が設定されていますね! date型の実体はint32、timestamp型の実体はint96のようです。

まとめ

Glueジョブで、CSVファイルをcreate_dynamic_frame_from_catalogでDynamicFrameに読み込むと int、longint以外はstringになってしまうようです。 これはLazySimpleSerDeOpenCSVSerDe共に同じ挙動でした。 なのでDataFrameに変換した後、PySparkのUDFを使ってカラムのキャストを行いました。

実現できた後に要素だけ取り出してみるとそこまで難しいなことをやっている訳ではないのですが、 UDFはSparkの各ワーカーノードで分散処理されているため、 コードに何かしらミスがあったとしても原因の特定するのが大変で思ったよりも苦労しました。 また、データカタログ側ではキャストする必要のあるカラムをstringで定義しておく必要がある点も特に納得できる理屈が見当たらず、 試行錯誤が必要でした。

単に型キャストをしたいだけという割には難易度が高いと感じました。 GlueデータカタログでCSVデータを扱う際にはこの辺の労力を見積もっておく必要がありますね。

参考情報