GlueJob上でSpark.SQLを通してMSCK REPAIR TABLEと同様の結果を得るまでに試行錯誤したこと
はじめに
Athenaで更新されていないPartitioned Tableを扱う場合には事前にMSCK REPAIR TABLE ${TABLE}
のSQL文を実行する必要があります。これを手作業で行っていましたが、該当のテーブル作成を実行しているGlue Jobにてまとめて行うことで自動化を検討しました。
結果として意図した動作に到達できましたが、追加した処理が想定どおりの動作にならなかったり、正常に完了せずエラーになり続けたりと悩まされました。振り返りとしてハマったことも含めて書き出してみました。
尚、コードはScalaを前提としていますが、Pythonを使う場合も大体似たような書き方になると思われます。
今回行ったこと
AWS Glue上で、ALTER TABLE table RECOVER PARTITIONS
文を実行するメソッドspark.catalog.recoverPartitions
によってパーティションのリカバリを図りました。
MSCK REPAIR TABLE
どころか全く異なるSQLになっていますが、Apache Spark上で全Partitionのリカバリを行うコマンドについては以下のコメントがついており、同等の結果が期待できることになります。
\** * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and * update the catalog. * * The syntax of this command is: * {{{ * ALTER TABLE table RECOVER PARTITIONS; * MSCK REPAIR TABLE table; * }}}
ただし、単純に上記メソッドを呼び出すだけでは上手く行かないケースもあります。以下、その紆余曲折です。
実行に至れるまで
今回は単純にテーブルへリカバリーをかければいいはず、という想定で始めています。故に、それ以外に解消できていない要素はないという思い込みです。
対象のTableがみつからない
最初は以下の一文の追加だけでした。セッションもあるし、普通に動くだろうという予想の上です。
spark.catalog.recoverPartitions(table)
ところが。
org.apache.spark.sql.catalyst.analysis.NoSuchTableException Table or view 'xxx' not found in database 'default';
default DBを対象にしているということが想定外です。
Spark.sqlで取り扱うデータベースを明示する
コード上では対象にするDataBaseの名前を収めている変数が存在していました。DataBaseのリストをfor文等で回している可能性も考えられましたが、DataBase名を固定prefixと変数混じりで直設定しているコードがありました。
default DBはGlue DataCatalog上で初期設定扱いのDBです。つまり、recoverPartitions
実行時のSparkセッションが取り扱うDBは初期設定のままかもしれません。となるとやるべきことは一つ。
spark.catalog.setCurrentDatabase(database)
結果。
org.apache.spark.sql.AnalysisException Database 'database' does not exist.;
そうきたか。
Sparkセッションの初期化時にconfigを付け足す
こうなると、Sparkセッションの初期化で何かもれている可能性が出てきます。とはいえ、以下のコードに過不足があるようにも見えません。
val spark: SparkSession = SparkSession.builder.enableHiveSupport.getOrCreate
Sessionから何か他にコネクションを貼りなおすメソッドがないか等延々確認しましたが、それらしきものは見当たらず。困り果てた結果、検索してみたところ以下の記事がかかりました。
やっていることが今回の私のケースと完全に同じです。セッションを作成する際に、GlueのDataCatalogをSpark SQLで扱うための設定を追加する必要があったわけです。
つまり、初期化コードは以下のようになります。
val spark: SparkSession = SparkSession.builder .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") .enableHiveSupport.getOrCreate
Partitonedの切り分け
ではこれで問題なし、というわけでもありませんでした。recoverPartitions
がTableのPartition有無関係なく実行していたためです。
org.apache.spark.sql.AnalysisException Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned tables:
database
.table
;
今回はPartitionedの場合にベースを収めている配列があったため、配列内に要素があるかどうかを基準にしてみました。
if(catalog.databaseExists(database) && partitionedBy.length > 0) { … }
通して行う
上記の過程を得た結果のコードが以下になります。データベースの有無チェックは余計だったかもしれません。
val spark: SparkSession = SparkSession.builder .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") .enableHiveSupport.getOrCreate val catalog = spark.catalog if(catalog.databaseExists(database) && partitionedBy.length > 0) { catalog.setCurrentDatabase(database) catalog.recoverPartitions(table) }
今回発生した修正は以下の通り。
- spark SQLでGlue DataCatalogを扱うための種別設定の追加
spark.catalog
で操作するGlue上のDatabase指定recoverPartitions
でPartitonedTableのリカバリー
Glue上のDatabase指定については、他の余計なDatabaseを全て削除済みであれば不要かもしれません。
ALTER TABLE table RECOVER PARTITIONS
については以下の点で判定が必要です。
- 対象のテーブルがPartitionedでなければエラーになる
あとがき
Glue Jobそのものの実行時間も大きくかかるため、一つ一つ修正しては実行にて相当の日数がかかっていました。Athenaでのsql実行という手もありましたが、Glue(Spark)は専用に手段が用意されていること、設計から見て意図が分かりやすくなる点からチャレンジしてみました。
似たようなトライをして上手くいかなかった人の参考になれば幸いです。
参考URL
- apache spark - Is there a method in Pyspark equivalent to SQL's MSCK REPAIR TABLE - Stack Overflow
- Using the AWS Glue Data Catalog as the Metastore for Spark SQL - Amazon EMR
- AWS Glue Data Catalog Support for Spark SQL Jobs - AWS Glue
- spark/ddl.scala at b5e4b8c73e10743eef4d35b6e82053a5a065b2ed · apache/spark