[Apache Hudi] Copy On Writeテーブルをクエリする

2021.07.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データ・アナリティクス事業本部の森脇です。

Apache Hudiでは、用途に応じた何種類かのクエリをサポートしています。

Copy On Writeテーブルでは以下の2クエリをサポートしています。

  • スナップショットクエリ(Snapshot Query)
  • インクリメンタルクエリ(Incremental Query)

それぞれの特性/用途を理解するため試してみました。

データ作成

まずは、クエリを試すためのテーブルを作成します。

今回は2レコードづつ10分毎に計2時間データを挿入したテーブルを用意しました。

データは公式のクイックスタートに記載されていた、Apache Hudi自身のサンプルデータ作成機能を利用しました。

hudi_query_test_morテーブル:

20210709012107,20210709012107_0_1,0b4c7277-69d2-40b3-8a68-3e87b53f39c6,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-94_20210709012107.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,0b4c7277-69d2-40b3-8a68-3e87b53f39c6
20210709012107,20210709012107_0_2,deb24798-aa25-42ff-af8e-f151fa1666bd,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-94_20210709012107.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,deb24798-aa25-42ff-af8e-f151fa1666bd
20210709013053,20210709013053_0_1,3c370dc4-a1c3-4ae6-b63b-a6110e10144f,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709013053.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,3c370dc4-a1c3-4ae6-b63b-a6110e10144f
20210709013053,20210709013053_0_2,d7f201d3-0f18-4a5b-96cd-efce8704ded5,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709013053.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,d7f201d3-0f18-4a5b-96cd-efce8704ded5
20210709014144,20210709014144_0_1,85b64f08-2426-4bfc-8821-8038d9cf5384,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709014144.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,85b64f08-2426-4bfc-8821-8038d9cf5384
20210709014144,20210709014144_0_2,42b96652-c863-499e-a349-2b6f18f65d37,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709014144.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,42b96652-c863-499e-a349-2b6f18f65d37
20210709015128,20210709015128_0_1,7511187f-43da-4300-a9d2-9638ca445cd0,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709015128.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,7511187f-43da-4300-a9d2-9638ca445cd0
20210709015128,20210709015128_0_2,9851813b-4891-4e27-b87b-5aa6c157d5c6,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709015128.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,9851813b-4891-4e27-b87b-5aa6c157d5c6
20210709020043,20210709020043_0_1,db1137ec-88ec-454e-8b86-e85a917d48c7,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709020043.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,db1137ec-88ec-454e-8b86-e85a917d48c7
20210709020043,20210709020043_0_2,7783bde9-464a-4cb3-bfc9-a72144d57a56,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709020043.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,7783bde9-464a-4cb3-bfc9-a72144d57a56
20210709021233,20210709021233_0_1,e897a02f-adae-4827-9cdb-d5a401971438,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709021233.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,e897a02f-adae-4827-9cdb-d5a401971438
20210709021233,20210709021233_0_2,d45c2565-289d-49a3-b723-8674025bf013,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709021233.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,d45c2565-289d-49a3-b723-8674025bf013
20210709022124,20210709022124_0_1,c8410328-a91a-439b-bee9-b1f09b8c6bd3,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709022124.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,c8410328-a91a-439b-bee9-b1f09b8c6bd3
20210709022124,20210709022124_0_2,3a1ecd61-aed0-44af-b47b-f11f6bbb1cde,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709022124.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,3a1ecd61-aed0-44af-b47b-f11f6bbb1cde
20210709023125,20210709023125_0_1,f83eb882-b269-43aa-aa06-c07ffdfaec36,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709023125.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,f83eb882-b269-43aa-aa06-c07ffdfaec36
20210709023125,20210709023125_0_2,8e463395-d353-4d9e-8ad5-93a6622ab3e3,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709023125.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,8e463395-d353-4d9e-8ad5-93a6622ab3e3
20210709024118,20210709024118_0_1,8f237f09-c641-4ed7-bc65-711b150bbd35,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709024118.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,8f237f09-c641-4ed7-bc65-711b150bbd35
20210709024118,20210709024118_0_2,45f102fc-168e-458c-9531-697af8f65009,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709024118.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,45f102fc-168e-458c-9531-697af8f65009
20210709025158,20210709025158_0_1,1310c3d4-4676-44fd-99af-787cfa56bf5c,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709025158.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,1310c3d4-4676-44fd-99af-787cfa56bf5c
20210709025158,20210709025158_0_2,8b975d1c-9ed6-4ce1-81cc-d95146b304ca,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709025158.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,8b975d1c-9ed6-4ce1-81cc-d95146b304ca
20210709030044,20210709030044_0_1,51487979-37a5-4a86-bfa2-b96e77af9e44,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709030044.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,51487979-37a5-4a86-bfa2-b96e77af9e44
20210709030044,20210709030044_0_2,e790c957-0392-43af-89d7-7c4e3a2b2ec4,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709030044.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,e790c957-0392-43af-89d7-7c4e3a2b2ec4
20210709031236,20210709031236_0_1,92fb35f5-fb05-4ac8-b3a9-bd69472073b5,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709031236.parquet,0.4726905879569653,0.46157858450465483,driver-213,0.754803407008858,0.9671159942018241,34.158284716382845,americas/brazil/sao_paulo,rider-213,0.0,92fb35f5-fb05-4ac8-b3a9-bd69472073b5
20210709031236,20210709031236_0_2,89808432-5b3c-4fed-b1c9-8d9cf164cf8f,americas/brazil/sao_paulo,5c7d6672-d4ff-4bb5-9ba1-e0139b98732a-0_0-22-97_20210709031236.parquet,0.6100070562136587,0.8779402295427752,driver-213,0.3407870505929602,0.5030798142293655,43.4923811219014,americas/brazil/sao_paulo,rider-213,0.0,89808432-5b3c-4fed-b1c9-8d9cf164cf8f

スナップショットクエリ(Snapshot Query)

最新の状態をクエリできる、最も基本的なクエリです。

hoodie.datasource.query.typeオプションにsnapshotを指定します。(オプション省略時のデフォルト値でもあります)

先程のテーブルにクエリを実行して、結果を確認します。 (見やすさのため、2列のみ表示するようにしています)

ソースコード:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_query_test_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

# スナップショットクエリ
hudi_options = {
  'table.type': 'COPY_ON_WRITE',
  'hoodie.datasource.query.type	': 'snapshot',
}
df = spark.read.format("hudi"). \
  options(**hudi_options). \
  load(f'{basePath}/*/*/*')

df.createOrReplaceTempView("snapshot_query_table")
spark.sql("select _hoodie_commit_time, uuid from snapshot_query_table order by 1").show(30)

job.commit()

結果はこのようになりました。

+-------------------+--------------------+
|_hoodie_commit_time|                uuid|
+-------------------+--------------------+
|     20210709012107|deb24798-aa25-42f...|
|     20210709012107|0b4c7277-69d2-40b...|
|     20210709013053|3c370dc4-a1c3-4ae...|
|     20210709013053|d7f201d3-0f18-4a5...|
|     20210709014144|85b64f08-2426-4bf...|
|     20210709014144|42b96652-c863-499...|
|     20210709015128|9851813b-4891-4e2...|
|     20210709015128|7511187f-43da-430...|
|     20210709020043|db1137ec-88ec-454...|
|     20210709020043|7783bde9-464a-4cb...|
|     20210709021233|d45c2565-289d-49a...|
|     20210709021233|e897a02f-adae-482...|
|     20210709022124|c8410328-a91a-439...|
|     20210709022124|3a1ecd61-aed0-44a...|
|     20210709023125|f83eb882-b269-43a...|
|     20210709023125|8e463395-d353-4d9...|
|     20210709024118|45f102fc-168e-458...|
|     20210709024118|8f237f09-c641-4ed...|
|     20210709025158|1310c3d4-4676-44f...|
|     20210709025158|8b975d1c-9ed6-4ce...|
|     20210709030044|51487979-37a5-4a8...|
|     20210709030044|e790c957-0392-43a...|
|     20210709031236|92fb35f5-fb05-4ac...|
|     20210709031236|89808432-5b3c-4fe...|
+-------------------+--------------------+

最新のデータである全24レコードすべてが取得できています。

インクリメンタルクエリ(Incremental Query)

スナップショットクエリが最新のデータをクエリできるのに対し、インクリメンタルクエリは開始時間/終了時間を指定しクエリを実行します。

このクエリを利用することで、特定期間のデータを取得することが可能です。

インクリメンタルクエリの場合、hoodie.datasource.query.typeオプションにincrementalを指定します。

また、hoodie.datasource.read.begin.instanttimeオプションに開始時間を、hoodie.datasource.read.end.instanttimeオプションには終了時刻を指定します。

この開始時間と終了時間の指定によって、「一週間前時点のデータ」や、「直近一週間の間に追加されたデータ」等をクエリすることが可能です。

開始時間は必須オプションです。時間の代わりに「000」と指定すると、「最も早い時間」という意味になります。

開始時間と終了時間を両方指定したケースを、公式ドキュメントではPoint in time queryと呼んでいました。

試してみます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job


## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_query_test_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

# インクリメンタルクエリ

print("beginのみ指定し、2021/07/09 03:00:00 以降のデータを取得 ---")
hudi_options = {
  'table.type': 'COPY_ON_WRITE',
  "hoodie.datasource.query.type": "incremental",
  "hoodie.datasource.read.begin.instanttime": "20210709030000",
}
df = spark.read.format("hudi"). \
  options(**hudi_options). \
  load(f'{basePath}/*/*/*')

df.createOrReplaceTempView("inc_begin_table")
spark.sql("select _hoodie_commit_time, uuid from inc_begin_table order by 1").show(30)

print("beginとendを指定指定し、2021/07/09 03:00:00 以前のデータを取得 ---")
hudi_options = {
  'table.type': 'COPY_ON_WRITE',
  "hoodie.datasource.query.type": "incremental",
  "hoodie.datasource.read.begin.instanttime": "000",
  "hoodie.datasource.read.end.instanttime": "20210709030000",
}
df = spark.read.format("hudi"). \
  options(**hudi_options). \
  load(f'{basePath}/*/*/*')

df.createOrReplaceTempView("inc_end_table")
spark.sql("select _hoodie_commit_time, uuid from inc_end_table order by 1").show(30)

print("beginとendを方指定し、2021/07/09 02:30:00 〜 2021/07/09 03:00:00のデータを取得 ---")
hudi_options = {
  'table.type': 'COPY_ON_WRITE',
  "hoodie.datasource.query.type": "incremental",
  "hoodie.datasource.read.begin.instanttime": "20210709023000",
  "hoodie.datasource.read.end.instanttime": "20210709030000",
}
df = spark.read.format("hudi"). \
  options(**hudi_options). \
  load(f'{basePath}/*/*/*')

df.createOrReplaceTempView("inc_begin_end_table")
spark.sql("select _hoodie_commit_time, uuid from inc_begin_end_table order by 1").show(30)

job.commit()

結果は以下のようになりました。

beginのみ指定し、2021/07/09 03:00:00 以降のデータを取得 ---
+-------------------+--------------------+
|_hoodie_commit_time|                uuid|
+-------------------+--------------------+
|     20210709030044|51487979-37a5-4a8...|
|     20210709030044|e790c957-0392-43a...|
|     20210709031236|92fb35f5-fb05-4ac...|
|     20210709031236|89808432-5b3c-4fe...|
+-------------------+--------------------+

beginとendを指定指定し、2021/07/09 03:00:00 以前のデータを取得 ---
+-------------------+--------------------+
|_hoodie_commit_time|                uuid|
+-------------------+--------------------+
|     20210709012107|0b4c7277-69d2-40b...|
|     20210709012107|deb24798-aa25-42f...|
|     20210709013053|3c370dc4-a1c3-4ae...|
|     20210709013053|d7f201d3-0f18-4a5...|
|     20210709014144|85b64f08-2426-4bf...|
|     20210709014144|42b96652-c863-499...|
|     20210709015128|7511187f-43da-430...|
|     20210709015128|9851813b-4891-4e2...|
|     20210709020043|db1137ec-88ec-454...|
|     20210709020043|7783bde9-464a-4cb...|
|     20210709021233|d45c2565-289d-49a...|
|     20210709021233|e897a02f-adae-482...|
|     20210709022124|c8410328-a91a-439...|
|     20210709022124|3a1ecd61-aed0-44a...|
|     20210709023125|f83eb882-b269-43a...|
|     20210709023125|8e463395-d353-4d9...|
|     20210709024118|45f102fc-168e-458...|
|     20210709024118|8f237f09-c641-4ed...|
|     20210709025158|1310c3d4-4676-44f...|
|     20210709025158|8b975d1c-9ed6-4ce...|
+-------------------+--------------------+

beginとendを方指定し、2021/07/09 02:30:00 〜 2021/07/09 03:00:00のデータを取得 ---
+-------------------+--------------------+
|_hoodie_commit_time|                uuid|
+-------------------+--------------------+
|     20210709023125|8e463395-d353-4d9...|
|     20210709023125|f83eb882-b269-43a...|
|     20210709024118|8f237f09-c641-4ed...|
|     20210709024118|45f102fc-168e-458...|
|     20210709025158|1310c3d4-4676-44f...|
|     20210709025158|8b975d1c-9ed6-4ce...|
+-------------------+--------------------+

それぞれ期待した通りの値が取得できました。

まとめ

Apache HudiのCopy On Writeテーブルで利用可能なクエリを試してみました。 日付データを持つカラムを利用することなく、実際に取り込んだ時間ベースでクエリができることはすごく便利そうだと感じました。

おかしなデータが混入した場合の特定にも使えそうです。

参照

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。