[Apache Hudi] ブートストラップ機能で作成したテーブルを、Amazon Athenaでクエリする

2021.07.19

データ・アナリティクス事業本部の森脇です。

先日、Amazon AthenaにてApache Hudi統合がアップグレードされたとアナウンスがありました。

Amazon Athena expands Apache Hudi support

Amazon Athena has updated its integration with Apache Hudi to support new features and the latest 0.8.0 community release. Hudi is an open-source data management framework used to simplify incremental data processing in S3 data lakes. The updated integration enables you to use Athena to query Hudi 0.8.0 tables managed via Amazon EMR, Apache Spark, Apache Hive or other compatible services and includes new support for snapshot queries and reading bootstrapped tables.

Apache Hudiの0.8.0に対応し、今まで動作しなかった

  • ブートストラップテーブルのRead
  • スナップショットクエリ

がサポートされたようです。

以前ブートストラップ機能を試しましたが、当時はAthenaでブートストラップテーブルをクエリすると、正常にデータを参照できませんでした。

以前の記事はこちら:

[Apache Hudi]Bootsrap機能を使ってデータを登録する

正式にサポートされたようなので、改めて試してみます。

検証用Glueジョブの作成

検証用のGlueジョブを作成します。

基本的にデフォルト設定でOKですが、以下2つの設定がポイントです。

依存JARパス

Mavenからsparkバンドルの0.8.0のHudiをダウンロードし、S3にアップロードして利用します。

カタログオプション

「Use Glue data catalog as the Hive metastore」にチェックを入れます。

このチェックを入れることで、Glueデータカタログとの連携が可能です。

試してみる

前回と同じデータを利用して試してみます。

プログラム:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

tableName = 'hudi_bootstrap_table_new2' # テーブル名
inputPath = "s3://apache-hudi-bootstrap-test-data/test-data/" # parquetファイルが格納されたバケットパス
hudiTablePath = f"s3://apache-hudi-bootstrap-test-meta/{tableName}/" # メタデータ(Hudiテーブル)の作成先。


# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'id', # レコードキーのカラム名
  'hoodie.datasource.write.precombine.field': 'date', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'bootstrap', # ブートストラップにて書き込み

  # ブートストラップオプション
  'hoodie.bootstrap.base.path': inputPath, # データファイルのパス
  'hoodie.bootstrap.keygen.class': 'org.apache.hudi.keygen.SimpleKeyGenerator', # キージェネレーター
 
  

  # データカタログ連携オプション(hive_sync)
  'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
  'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名
  'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
  'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。
  'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor' # パーティション無し
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
schema = StructType([])
emptyDataFrame = spark.createDataFrame(sc.emptyRDD(), schema)
emptyDataFrame.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(hudiTablePath)

job.commit()

実行後にAthenaでクエリを実行してみると...結果が正しく表示されるようになりました!

まとめ

Amazon Athenaのアップデートによって、今まで対応していなかったHudiの機能が利用できるようになりました。

ブートストラップ機能は個人的に非常に便利だと思っていたので、Athenaを併用できるようになったのは非常に良いアップデートだと思いました。

別の新機能も順次試していこうと思います。

参照

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。