AWS Glue DynamicFrameが0レコードでスキーマが取得できない場合の対策と注意点

2021.07.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部コンサルティングチームの石川です。create_dynamic_frame.from_catalog()で取得したDynamicFrameのレコード数が0件の場合にスキーマが取得できず、後続のSpark SQLがカラム名を解決できないため、エラーが発生します。本日は、この問題の対策と注意点について解説します。

レコード数が0件の場合に何が起こるのか

Glue ETL Jobのcreate_dynamic_frame.from_catalog()は、Glue データカタログに登録されているテーブルのデータからDynamicFrameを生成して返す関数です。create_dynamic_frame.from_catalog()では実際のデータからスキーマを取得してDynaicFrameを生成するので、レコードが0件でデータが存在しない場合、スキーマも取得することができない動作となります。

これらの制限に対応するために、AWSGlue を導入するDynamicFrame。DynamicFrame は、DataFrame と似ていますが、各レコードが自己記述できるため、最初はスキーマは必要ありません。代わりに、AWSGlue は必要に応じてオンザフライでスキーマを計算し、選択 (または共用) タイプを使用してスキーマの不一致を明示的にエンコードします。

よって、指定したテーブルが0件の場合、生成されるDynamicFrameは、メタ情報のない0レコードのDynamicFrameが生成されます。

そのため、カラム指定でSparkSQLを実行すると「そんなカラムがない」というエラーが発生します。

しかし、取得したテーブルが0件であることは日常的にあります。push_down_predicateを指定した場合は、更に0件になる可能性は高くなります。念の為、0レコードのParquetを作成してメタ情報が取れないか確認しましたが回避できません。

案1:count()でレコード数を取得して回避

Dataframeのcount()を用いて、0レコード以上なら後続のSparkSQLの実行します。

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "zero_record_table", transformation_ctx = "datasource0")
if datasource0.toDF().count() > 0:
    run_query()
:

案2:GlueデータカタログをHiveメタストアとして使用して回避

上記の方法では、複数のテーブルを結合したり、複数のSELECTをUNIONするユースケースでは、レコードの有無でクエリをハンドリングするのは煩雑という課題が残ります。

そこで、GlueデータカタログをHiveメタストアとして使用する方法を紹介します。AWS Glueでは、ジョブの実行(Spark Submit)のJob parametersに--enable-glue-datacatalogを指定すると、Glueデータカタログからスキーマを取得するように変更できます。

  • --enable-glue-datacatalog— 使用できるようにします。AWS Apache Spark Hive メタストアとしてのGlue データカタログ。この機能を有効にするには、キーを指定します。値は必要ありません。

ジョブの設定

新規のジョブをマネージメントコンソールで作成する場合、「Use Glue data catalog as the Hive metastore」の設定を有効化してデータカタログからスキーマを取得できるようにします。

既存のジョブの変更をマネジメントコンソールで設定変更する場合、ジョブパラメータで--enable-glue-datacatalogの値に適当な文字列を設定して一度保存してから、再度ジョブの設定を開き値を空にして保存します。

SaprkSQLの実行

createOrReplaceTempView()にて、Dataframeとテーブルを関連付けする代わりに以下の何れかの方法でテーブルにアクセスします。

DBを指定してSpark SQLを実行する方法

デフォルトのDBを設定した後にクエリを実行します。デフォルトのDBを設定しないとテーブル名が解決できないためエラーになります。

spark.sql("USE mydb")
df_sql = spark.sql("select id, total from sst_zr_pt")

TableにDBを付与してSpark SQLを実行する方法

の形式で、対象のテーブルを指定します。

df_sql = spark.sql("select id, total from mydb.sst_zr_pt")

注意事項

IAMロールにglue:CreateDatabase権限が必要

GlueデータカタログをHiveメタストアとして使用するには、ジョブの実行時に指定するIAMロールにglue:CreateDatabase権限を付与しなければなりません。

パーティションのフォルダが存在しない場合にエラーになる

create_dynamic_frame.from_catalog()は、パーティションを設定したのに(ALTER TABLE ADD PARTITIONしたのに)パーティションフォルダがないとエラーになります。フォルダが存在すれば、Parquetファイルが無くても問題ありません。

新しくデータが連携されたタイミングでパーティションを作成するのは、運用の中で行うのは面倒です。そのため、パーティションを事前に一括作成する方法がありますが、エラーになってしまうのでこの方法は利用できません。

まとめ

create_dynamic_frame.from_catalog()では実際のデータからスキーマを取得してDynaicFrameを生成するので、レコードが0件でデータが存在しない場合、スキーマも取得することができない動作となります。

回避策は、count()でレコード数を取得してハンドリングするか、GlueデータカタログをHiveメタストアとして使用してスキーマ情報を取得する何れかになります。Glue StudioのSparkSQL Transformが使えるようになった今、積極的にSparkSQLを活用して、素早くELTを実装したり、メンテナンスを向上させたいので、後者を選択することがますます増えることでしょう。

GlueデータカタログをHiveメタストアとして使用してスキーマ情報を取得する場合、パーティションのフォルダが存在しない場合にエラーになリますのでご注意ください。今後、Athenaでパーティション設定を自動化するならPartition Projectionを活用するのが良いでしょう。

デフォルト動作はSparkSQLがエラーになることでデータレイクにデータが連携されなかったということを気づくことができるという点では、メリットもいえるのでうまく使い分けていただけると幸いです。