AWS上でApache Hudiを動かし、Glueデータカタログと連携する
データアナリティクス事業本部の森脇です。
以前の記事にて、AWS上でApache Hudiを動かす方法を試しました。
今回はApache HudiとGlueデータカタログを連携させてみます。
Glueデータカタログと連携することで、「Apache Hudiでテーブルを作成し、Amazon Athenaで参照する」といった使い方が可能となります。
早速試してみます。
基本方針
連携するためには、以下2つの設定を変更します。
- AWS Glueのジョブ設定
- Apache Hudiのoption
AWS Glueジョブ設定には、「GlueデータカタログをHiveメタストアとして利用する」オプションがあります。
また、Apache Hudiには、「Hiveメタストアと連携する」オプションが存在します。
これらを両方利用することで、Glueデータカタログと連携することが可能です。
試してみる
以前の記事で検証した環境をベースに利用します。
事前に準備するAWSリソースなどは、上記を参照ください。
ベース設定からいくつか変更を行い検証します。
IAMロール
Glueデータカタログにアクセスできるようにするため、「glue:*」を許可するようポリシーを更新します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:*", "logs:*", "glue:*" ], "Resource": "*" } ] }
Glueジョブ
前回作成したジョブの「ジョブの編集」をクリックし、「ジョブパラメーター」設定を追加します。
キー名 | 値 |
---|---|
--enable-glue-datacatalog | t |
この設定で、GlueジョブがGlueデータカタログをHiveメタストアとして使用するようになります。
スクリプト
ジョブのスクリプトを変更します。
Hiveメタストアと連携するために「hoodie.datasource.hive_sync.*」設定をoptionに指定します。
また、今回はテーブルへの書き込みだけ確認すればよいため、クエリ実行の処理は除きました。
以下のようになります。
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) # クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成 dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator() inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.show() tableName = 'hudi_sample_table' # テーブル名 bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット basePath = f's3://{bucketName}/{tableName}' # Hudiのオプション hudi_options = { 'hoodie.table.name': tableName, # テーブル名 # 書き込みオプション 'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名 'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名 'hoodie.datasource.write.table.name': tableName, # テーブル名 'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別 'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される) 'hoodie.upsert.shuffle.parallelism': 2, # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用) 'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数 # データカタログ連携オプション(hive_sync) 'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする 'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名 'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名 'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', # パーティションの方式を指定 'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。 } # データの書き込み # 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath) job.commit()
Glueジョブの実行
変更後のGlueジョブを実行し、正常終了したら完了です。 マネジメントコンソールでGlueデータカタログを見ると、「default」データベースに「hudi_sample_table」テーブルが作成されていることが確認できました。
パーティションも正しく認識されていますね。
この状態でAthenaを使ってみたところ、正しくクエリを実行することもできました!
まとめ
Apache Hudiを利用することで、テーブル作成時にGlueデータカタログに情報を登録することができました。
Athenaを使う際に便利そうですね。
今後もHudiの機能を試していきたいと思います。
参考
※Apache®、Apache Hudi、Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。