[Apache Hudi x Athena 新機能] Merge On ReadテーブルをAthenaでクエリする

2021.08.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データ・アナリティクス事業本部の森脇です。

前回に引き続き、Apache Hudi x Amazon Athenaの新機能を試してみます。

前回の記事はこちら:

[Apache Hudi] ブートストラップ機能で作成したテーブルを、Amazon Athenaでクエリする

今回は、Merge On Readテーブルのクエリを試してみます。

公式アナウンスでは「スナップショットクエリ」がサポートされた」と記載されていますが、Merge On Readテーブルの読み込み最適化クエリ(Read optimized query)がサポートされたようです。

試してみる

Merge On ReadテーブルをGlueデータカタログ連携し、テーブル作成,及びデータ取り込みを行います。

初期データが5 レコードでテーブルを作成し、その後数レコードupdateします。

Glueジョブの設定は、前回のブログと同じようにしました。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tableName = 'hudi_mor_athena_sample' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
  'hoodie.compact.inline': 'false',
  'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数

  # データカタログ連携オプション(hive_sync)
  'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
  'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名
  'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
  'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名
  'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',  # パーティションの方式を指定
  'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。
}


# 初期データでテーブル作成
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)


# update用データを作成
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()

# update
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

確認

ジョブ実行後、どのようにテーブルが作成されたか確認してみます。

Glueデータカタログ

Glueデータカタログを確認すると、テーブルが2つ作成されていました。

hudi_mor_athena_sampleという名前でテーブルを作成したのですが、同名のテーブルは存在しません。

代わりに「_rt」、「_ro」とサフィックスがついたテーブルが作成されました。

これらは、それぞれ「real time」と「read optimized」の略です。

つまり、「リアルタイムクエリ」を実行したい場合は「_rt」付きのテーブルに対して、

「読み込み最適化クエリ」を実行したい場合には「_ro」付きのテーブル対してクエリを実行する仕組みのようです。

クエリしてみる

hudi_mor_athena_sample_rtテーブル

SELECT * FROMで全件データを確認します。

"_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","rider","ts","uuid","contient","country","city"
"20210810051144","20210810051144_1_1","e3c58729-e0e8-42a5-9ba2-f30a3b40171c","asia/india/chennai","0aa71d9b-c144-48a1-8a95-950fa5f8a0e8-0","0.36519521355305173","0.9888075495133515","driver-385","0.013401540991535565","0.3794482769934313","18.56488085068272","asia/india/chennai","rider-385","1628447992245","e3c58729-e0e8-42a5-9ba2-f30a3b40171c","asia","india","chennai"
"20210810051144","20210810051144_0_2","8689f4d2-e140-449a-a980-9cd4f5dc21dc","americas/united_states/san_francisco","bd6cf2d3-046c-4c9d-a09b-41af123ead14-0","0.14516349705850584","0.12153670568058683","driver-385","0.03100772707494559","0.46798677607436234","7.50588760043035","americas/united_states/san_francisco","rider-385","1627969036256","8689f4d2-e140-449a-a980-9cd4f5dc21dc","americas","united_states","san_francisco"
"20210810051144","20210810051144_0_3","a14a02a8-1044-4796-b197-bbc71b73d77b","americas/united_states/san_francisco","bd6cf2d3-046c-4c9d-a09b-41af123ead14-0","0.8236411667430927","0.020856583634078385","driver-385","0.9835835198493069","0.37046357849997036","65.1058505660742","americas/united_states/san_francisco","rider-385","1628419874544","a14a02a8-1044-4796-b197-bbc71b73d77b","americas","united_states","san_francisco"
"20210810051115","20210810051115_0_1","dfabb998-2bb7-4447-a3f9-8ed75f422431","americas/brazil/sao_paulo","a297fb9f-fcda-4631-a1fb-a66a85cd5e35-0_0-24-100_20210810051115.parquet","0.6100070562136587","0.8779402295427752","driver-213","0.3407870505929602","0.5030798142293655","43.4923811219014","americas/brazil/sao_paulo","rider-213","1627968847502","dfabb998-2bb7-4447-a3f9-8ed75f422431","americas","brazil","sao_paulo"
"20210810051115","20210810051115_0_4","3b5767c1-9748-4f75-94ce-8de695b971b7","americas/brazil/sao_paulo","a297fb9f-fcda-4631-a1fb-a66a85cd5e35-0_0-24-100_20210810051115.parquet","0.4726905879569653","0.46157858450465483","driver-213","0.754803407008858","0.9671159942018241","34.158284716382845","americas/brazil/sao_paulo","rider-213","1628173269854","3b5767c1-9748-4f75-94ce-8de695b971b7","americas","brazil","sao_paulo"

最初のカラムを確認すると、コミットタイムが2種類あります。つまり、updateのデータが反映されているリアルタイムなデータをクエリすることができました。

また、このクエリには約3秒かかりました。

hudi_mor_athena_sample_roテーブル

こちらのテーブルも同様に、全件のデータを確認してみます。

"_hoodie_commit_time","_hoodie_commit_seqno","_hoodie_record_key","_hoodie_partition_path","_hoodie_file_name","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","rider","ts","uuid","contient","country","city"
"20210810051115","20210810051115_2_3","e3c58729-e0e8-42a5-9ba2-f30a3b40171c","asia/india/chennai","0aa71d9b-c144-48a1-8a95-950fa5f8a0e8-0_2-24-102_20210810051115.parquet","0.40613510977307","0.5644092139040959","driver-213","0.798706304941517","0.02698359227182834","17.851135255091155","asia/india/chennai","rider-213","1627996432612","e3c58729-e0e8-42a5-9ba2-f30a3b40171c","asia","india","chennai"
"20210810051115","20210810051115_0_1","dfabb998-2bb7-4447-a3f9-8ed75f422431","americas/brazil/sao_paulo","a297fb9f-fcda-4631-a1fb-a66a85cd5e35-0_0-24-100_20210810051115.parquet","0.6100070562136587","0.8779402295427752","driver-213","0.3407870505929602","0.5030798142293655","43.4923811219014","americas/brazil/sao_paulo","rider-213","1627968847502","dfabb998-2bb7-4447-a3f9-8ed75f422431","americas","brazil","sao_paulo"
"20210810051115","20210810051115_0_4","3b5767c1-9748-4f75-94ce-8de695b971b7","americas/brazil/sao_paulo","a297fb9f-fcda-4631-a1fb-a66a85cd5e35-0_0-24-100_20210810051115.parquet","0.4726905879569653","0.46157858450465483","driver-213","0.754803407008858","0.9671159942018241","34.158284716382845","americas/brazil/sao_paulo","rider-213","1628173269854","3b5767c1-9748-4f75-94ce-8de695b971b7","americas","brazil","sao_paulo"
"20210810051115","20210810051115_1_2","8689f4d2-e140-449a-a980-9cd4f5dc21dc","americas/united_states/san_francisco","bd6cf2d3-046c-4c9d-a09b-41af123ead14-0_1-24-101_20210810051115.parquet","0.21624150367601136","0.14285051259466197","driver-213","0.5890949624813784","0.0966823831927115","93.56018115236618","americas/united_states/san_francisco","rider-213","1628269675504","8689f4d2-e140-449a-a980-9cd4f5dc21dc","americas","united_states","san_francisco"
"20210810051115","20210810051115_1_5","a14a02a8-1044-4796-b197-bbc71b73d77b","americas/united_states/san_francisco","bd6cf2d3-046c-4c9d-a09b-41af123ead14-0_1-24-101_20210810051115.parquet","0.5731835407930634","0.4923479652912024","driver-213","0.08988581780930216","0.42520899698713666","64.27696295884016","americas/united_states/san_francisco","rider-213","1628058536816","a14a02a8-1044-4796-b197-bbc71b73d77b","americas","united_states","san_francisco"

最初のカラムを確認すると...1種類のデータしか存在しません。すなわちこちらのテーブルではupdate前のデータがクエリされました。

クエリには、約1.3秒かかりました。

この結果から、_roテーブルは読み込み最適化されたデータ、つまり、差分データが含まれないデータセットに対してクエリをが行えることが分かりました。

クエリ実行時間も約半分です。

まとめ

Athenaでも、Merge On Readの読み込み最適化クエリがサポートされたことを確認することができました!

最新のデータは必要ないが、素早く結果を得たい場合に役立ちそうです。

また、インクリメンタルクエリも今後サポートされると嬉しいなと思いました。

参照

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。