この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
S3にあるCSVファイルをParquetとして出力したいという要件がありました。 CSVがただのテキストファイルであるのに対して、 Parquetはカラムごとに型の情報を持つので、各カラムには目的の型でデータを持たせたいです (使うときは型キャストしてから使ってね、というのはものすごくマヌケ...)。 今回はCSVファイルの容量が大きいことと、 Parquetに出力する前にちょっとした変換処理も入れる必要があったため、 Glueジョブを使ってこの変換をやってみました。
CSVデータを読んで型情報をつけたい、という用途であれば、
Glueデータカタログを利用するという方法が思い当たります。
Glueジョブではcreate_dynamic_frame_from_catalog
というメソッドを使うことで、
Glueデータカタログを経由してDynamicFrameにデータを読み込むことができますので、
今回はこのメソッドを使ってみました。
...さて、実際にこの方法で上記の処理を実装して見ました。 割と簡単にできたように思ったのですが、出力したParquetの中身を見てみると、 Glueデータカタログでdate型やtimestamp型などと指定したカラムにもかかわらず、 それらがstring型になっていることに気づきました。 Parquetにする以上は型情報をきちんと付与したいので、これを行う方法を探って見ました。
事象の確認
改めて、何が問題かを確認してみます。
サンプルファイル
以下のようなサンプルを用意してS3に配置します。
sample.csv
sutoringu,seisu,hiduke,jikoku,seisu_small,seisubig
aaa,123,2022-06-23,2020-05-08 17:56:00,123,123
hiduke
とjikoku
のデータは、
Athenaにおいては(LazySimpleSerDe
を使用した時に)想定通りの型キャストが成功するフォーマットになっています。
Athenaでキャストされる様子については以下のブログをご参照ください。
データカタログ定義
Glueデータカタログを定義します。 Athenaで以下のクエリを実行します。
create_datacatalog_ng.sql
CREATE TABLE catalog_lazy_simple_serde(
sutoringu string,
seisu int,
hiduke date,
jikoku timestamp,
seisu_small smallint,
seisu_big bigint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://bucket-name/catalog/simple/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
);
hiduke
はdate型、jikoku
はtimestamp型など、目的の型を指定しています。
GlueジョブでDynamicFrameに読んでみる
GlueジョブでDynamicFrameに読み込んでみます。 (キャスト処理を含むコード全文は後ほど掲載しますので、ここではコードのイメージだけ示します)
dynamic_frame = create_dynamic_frame(glue_context, database_name, table_name)
print(f"[INFO] {dynamic_frame.count()}件をDynamicFrameに読み込みました")
print("[INFO] dynamic_frame.printSchema():")
dynamic_frame.printSchema()
dynamic_frame.show()
data_frame.write.mode("overwrite").format("parquet").option(
"compression", "snappy"
).save("s3://bucket-name/catalog/simple-output/")
実行した時のログは以下のようになりました。
[INFO] 1件をDynamicFrameに読み込みました
[INFO] dynamic_frame.printSchema():
root
|-- sutoringu: string
|-- seisu: int
|-- hiduke: string
|-- jikoku: string
|-- seisu_small: string
|-- seisu_big: long
{
"sutoringu": "aaa",
"seisu": 123,
"hiduke": "2022-06-23",
"jikoku": "2020-05-08 17:56:00",
"seisu_small": "123",
"seisu_big": 123
}
seisu
とseisu_big
以外がstring型になってしまっています。。。
ダメ元で吐き出されたParquetファイルを確認してみます。
$ parquet-tools schema --detail part-00000-d0936b9f-7fa1-4f46-941b-6a4bf8055d61-c000.snappy.parquet
(要点のみ)
message spark_schema {
optional binary sutoringu (STRING);
optional int32 seisu;
optional binary hiduke (STRING);
optional binary jikoku (STRING);
optional binary seisu_small (STRING);
optional int64 seisu_big;
}
やはりSTRING
となっています。。。
Athenaでは期待通りのキャストが出来たのに、
create_dynamic_frame_from_catalog
で読んだ場合には勝手が違うようです。
PySparkでキャストしてみる
色々と試行錯誤してみたのですが、 残念ながらお手軽に目的のデータ型として読み込む方法は見つかりませんでした。
仕方ないので、PySparkの処理で愚直にキャストをして行きます。 今回、少なくとも、DynamicFrameからDataFrameに変換してからキャストすることで実現ができることが確認できました。 DynamicFrameのままでも同様にできるかもしれませんが、今回は未検証です。
Glueデータカタログ定義
データカタログの定義です。 Athenaで以下のクエリを流して作成しました。
create_datacatalog_ok
CREATE TABLE catalog_lazy_simple_serde_string(
sutoringu string,
seisu int,
hiduke string,
jikoku string,
seisu_small string,
seisu_big bigint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://bucket-name/catalog/simple/'
TBLPROPERTIES (
'has_encrypted_data'='false',
'skip.header.line.count'='1'
);
ポイントとして、今度はint
、bigint
以外のカラムは全てstring
で定義しています。
なんでなのかはわからないのですが、実際こうしないと失敗することがありました。
キャストするGlueスクリプト
次のようなスクリプトでキャストができました。
convert_to_parquet_with_casted_columns.py
import datetime
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.types import ShortType, TimestampType, DateType
from pyspark.sql.functions import col, lit, udf
def create_dynamic_frame(glue_context, database_name, table_name):
return glue_context.create_dynamic_frame_from_catalog(
database=database_name,
table_name=table_name,
)
def transform_df(df):
def _cast_date(date_str, date_format):
if date_str is not None and date_str != "":
return datetime.datetime.strptime(date_str, date_format).date()
udf_date = udf(_cast_date, DateType())
def _cast_ts(ts_str, ts_format):
if ts_str is not None and ts_str != "":
return datetime.datetime.strptime(ts_str, ts_format)
udf_ts = udf(_cast_ts, TimestampType())
# 日付のキャスト
df = df.withColumn("hiduke", udf_date("hiduke", lit("%Y-%m-%d")))
# 日時のキャスト
df = df.withColumn("jikoku", udf_ts("jikoku", lit("%Y-%m-%d %H:%M:%S")))
# smallintのキャスト
df = df.withColumn("seisu_small", col("seisu_small").cast(ShortType()))
return df
def main():
glue_context = GlueContext(SparkContext.getOrCreate())
database_name = "cm_hirano_spectrum"
table_name = "catalog_lazy_simple_serde_string"
print(f"[INFO] database_name: {database_name}")
print(f"[INFO] table_name: {table_name}")
dynamic_frame = create_dynamic_frame(glue_context, database_name, table_name)
print(f"[INFO] {dynamic_frame.count()}件をDynamicFrameに読み込みました")
print("[INFO] dynamic_frame.printSchema():")
dynamic_frame.printSchema()
dynamic_frame.show()
data_frame = dynamic_frame.toDF()
data_frame = transform_df(data_frame)
data_frame.printSchema()
data_frame.show()
data_frame.write.mode("overwrite").format("parquet").option(
"compression", "snappy"
).save("s3://bucket-name/catalog/simple-output/")
if __name__ == "__main__":
main()
ポイントとしては以下です。
create_dynamic_frame_from_catalog
でDynamicFrame読んで、すぐにDataFrameに変換している- date型とtimestamp型についてはキャストするコードをUDFに書いている
- UDFの中は普通にdatetimeパッケージを使ってキャストしている
- UDF呼び出しの第二引数にはPySparkのtypesクラスを渡している
- smallintへのキャストは
ShortType
を指定するだけでUDFは不要でした
実行ログ
[INFO] 1件をDynamicFrameに読み込みました
[INFO] dynamic_frame.printSchema():
root
|-- sutoringu: string
|-- seisu: int
|-- hiduke: string
|-- jikoku: string
|-- seisu_small: string
|-- seisu_big: long
{
"sutoringu": "aaa",
"seisu": 123,
"hiduke": "2022-06-23",
"jikoku": "2020-05-08 17:56:00",
"seisu_small": "123",
"seisu_big": 123
}
root
|-- sutoringu: string (nullable = true)
|-- seisu: integer (nullable = true)
|-- hiduke: date (nullable = true)
|-- jikoku: timestamp (nullable = true)
|-- seisu_small: short (nullable = true)
|-- seisu_big: long (nullable = true)
+---------+-----+----------+-------------------+-----------+---------+
|sutoringu|seisu| hiduke| jikoku|seisu_small|seisu_big|
+---------+-----+----------+-------------------+-----------+---------+
| aaa| 123|2022-06-23|2020-05-08 17:56:00| 123| 123|
+---------+-----+----------+-------------------+-----------+---------+
キャスト後はdate
やtimestamp
という型になっていますね!
出力されたparquetを見てみます。
$ parquet-tools schema --detail part-00000-b23f3edf-7774-48b4-bca4-d671a47ba9b9-c000.snappy.parquet
message spark_schema {
optional binary sutoringu (STRING);
optional int32 seisu;
optional int32 hiduke (DATE);
optional int96 jikoku;
optional int32 seisu_small (INTEGER(16,true));
optional int64 seisu_big;
}
ちゃんと型が設定されていますね! date型の実体はint32、timestamp型の実体はint96のようです。
まとめ
Glueジョブで、CSVファイルをcreate_dynamic_frame_from_catalog
でDynamicFrameに読み込むと
int、longint以外はstringになってしまうようです。
これはLazySimpleSerDe
、OpenCSVSerDe
共に同じ挙動でした。
なのでDataFrameに変換した後、PySparkのUDFを使ってカラムのキャストを行いました。
実現できた後に要素だけ取り出してみるとそこまで難しいなことをやっている訳ではないのですが、 UDFはSparkの各ワーカーノードで分散処理されているため、 コードに何かしらミスがあったとしても原因の特定するのが大変で思ったよりも苦労しました。 また、データカタログ側ではキャストする必要のあるカラムをstringで定義しておく必要がある点も特に納得できる理屈が見当たらず、 試行錯誤が必要でした。
単に型キャストをしたいだけという割には難易度が高いと感じました。 GlueデータカタログでCSVデータを扱う際にはこの辺の労力を見積もっておく必要がありますね。