Apache HudiのCopy On Writeテーブルを試してみる

2020.12.21

データアナリティクス事業本部の森脇です。

今回は、Apache Hudiのテーブル仕様を理解していきたいと思います。

Apache Hudiとは?

Apache Software FoundationのプロジェクトであるOSSです。

Apache Hudi公式サイト(https://hudi.apache.org/)

データレイクを構築する際に利用可能で、ストリームデータ、ファイルデータをクラウドストレージ上で管理することが可能になります。

Apache Hudiは、2種類のテーブルを作成することが可能です。

  • 「Copy On Write」 テーブル
  • 「Read On Merge」 テーブル

今回は、このうちの「Copy On Write」を試していきたいと思います。

Copy On Write テーブル

公式ドキュメントのConceptsにCopy On Writeテーブルに関する記載があります。

以下抜粋:

File slices in Copy-On-Write table only contain the base/columnar file and each commit produces new versions of base files. In other words, we implicitly compact on every commit, such that only columnar data exists. As a result, the write amplification (number of bytes written for 1 byte of incoming data) is much higher, where read amplification is zero. This is a much desired property for analytical workloads, which is predominantly read-heavy.

ポイントをまとめてみると、以下のようでした。

  • カラムナベースのファイル(parquet)を利用する
  • コミット毎に新しいバージョンのファイルが作成される
  • 書き込みのコストが大きい
  • 読み込み量の多いワークロードで有効

実際に試してみる

実際にどのような挙動になるか試してみましょう。

以前の記事と同様に、今回もGlueジョブを使って試します。

Glueジョブの設定方法は以前の記事を参照ください。

AWS上でApache Hudiを動かす

いくつかの操作を順番に行い、完了後のファイルがどのようになっているかを確認していきたいと思います。

(1) テーブル作成&初回データ挿入

6行の簡単なデータを初回データとして挿入します。

ジョブ:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_cow' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (1) 初回データ
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/18", "00001", "A"),
    ("2020/12/18", "00002", "A"),
    ("2020/12/19", "00003", "A"),
    ("2020/12/19", "00004", "A"),
    ("2020/12/20", "00005", "A"),
    ("2020/12/20", "00006", "A"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)


job.commit()

ジョブを実行し、完了後にS3の構成を見てみましょう。

$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake
2020-12-21 08:51:13          0 hudi_sample_cow/.hoodie/.aux/.bootstrap/.fileids_$folder$
2020-12-21 08:51:13          0 hudi_sample_cow/.hoodie/.aux/.bootstrap/.partitions_$folder$
2020-12-21 08:51:13          0 hudi_sample_cow/.hoodie/.aux/.bootstrap_$folder$
2020-12-21 08:51:13          0 hudi_sample_cow/.hoodie/.aux_$folder$
2020-12-21 08:51:12          0 hudi_sample_cow/.hoodie/.temp_$folder$
2020-12-21 08:51:32       3216 hudi_sample_cow/.hoodie/20201220235108.commit
2020-12-21 08:51:25          0 hudi_sample_cow/.hoodie/20201220235108.commit.requested
2020-12-21 08:51:26       2143 hudi_sample_cow/.hoodie/20201220235108.inflight
2020-12-21 08:51:12          0 hudi_sample_cow/.hoodie/archived_$folder$
2020-12-21 08:51:13        236 hudi_sample_cow/.hoodie/hoodie.properties
2020-12-21 08:51:12          0 hudi_sample_cow/.hoodie_$folder$
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/18/.hoodie_partition_metadata
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/18/4c4cebbf-040d-4266-b047-c73e3c777068-0_0-6-56_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/18_$folder$
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/19/.hoodie_partition_metadata
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/19/f01b8261-aa5f-4482-bcb1-37018b54e1cc-0_1-6-57_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/19_$folder$
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/20/.hoodie_partition_metadata
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_2-6-58_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/20_$folder$
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12_$folder$
2020-12-21 08:51:29          0 hudi_sample_cow/2020_$folder$
2020-12-21 08:51:12          0 hudi_sample_cow_$folder$

「time」カラムでパーティションが切られ、パーティション毎にparquetファイルが1つずつ作成されています。

S3 Selectを使ってparquetファイルの中身を見てみましょう。

パーティションキーが「2020/12/18」である、2レコードのデータが入っていました。

先頭列にはメタカラムがついており、末尾の3カラムが挿入したデータですね。

パーティション毎にparquetファイルが1つ作成されました。(データ量が多い場合はどうなるのかな?)

(2) レコードを追加で挿入(Insert)

(1)を実行した状態で、追加のレコードを挿入してみます。

  • パーティションが「2020/12/20」のレコード(すでに存在しているパーティション)
  • パーティションが「2020/12/21」のレコード(新規パーティション)

の2レコードを挿入してみます。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_cow' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (2) レコードを追加で挿入(Insert)
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "A"),
    ("2020/12/21", "00008", "A"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

ジョブを実行し、再度S3を確認しましょう。

$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_cow/2020/12/2
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/20/.hoodie_partition_metadata
2020-12-21 08:53:49     434429 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-7-66_20201220235328.parquet
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_2-6-58_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/20_$folder$
2020-12-21 08:53:48         93 hudi_sample_cow/2020/12/21/.hoodie_partition_metadata
2020-12-21 08:53:49     434644 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-7-67_20201220235328.parquet
2020-12-21 08:53:47          0 hudi_sample_cow/2020/12/21_$folder$

「2020/12/21」のparquetファイルも追加され、「2020/12/20」のparquetファイルは2つに増えました。

両者の中身を確認してみましょう。

parquetファイルの末尾にはタイムスタンプが付与されているため、「2020/12/20」のファイルは日付が新しいもの(今回のジョブ実行で追加されたもの)の中身を確認します。

2020/12/21のファイル:

追加した1レコードのみが入っているファイルです。

これはわかりやすいですね。

2020/12/20のファイル:

こちらは、「元々存在していた2レコード」と、「新規に追加した1レコード」の計3レコードが1ファイルに入っています。

公式ドキュメントの記載にあった通り、処理実行毎にコンパクションが行われ、新しいリビジョン向けのファイルが作成されていました。

データの差分だけのファイルが作成されるわけではなく、全データを含むマージされたファイルが作成されるのがポイントですね。

(3) レコードの更新(Update)

次はレコードを更新してみます。

(2)で追加した2つのレコードを更新してみます。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_cow' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (3) レコードを更新(Update)
# 「option」列のデータを変更する
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "B"),
    ("2020/12/21", "00008", "B"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

実行すると、やはりparquetファイルが増えていました。

$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_cow/2020/12/2
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/20/.hoodie_partition_metadata
2020-12-21 09:11:21     434437 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-23-128_20201221001030.parquet
2020-12-21 08:53:49     434429 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-7-66_20201220235328.parquet
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_2-6-58_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/20_$folder$
2020-12-21 08:53:48         93 hudi_sample_cow/2020/12/21/.hoodie_partition_metadata
2020-12-21 09:11:21     434664 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-23-129_20201221001030.parquet
2020-12-21 08:53:49     434644 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-7-67_20201220235328.parquet
2020-12-21 08:53:47          0 hudi_sample_cow/2020/12/21_$folder$

「2020/12/20」のファイルだけ中身を確認してみましょう。

データは3レコード分存在し、更新を行ったレコードのデータは更新されています。

(4) レコードの削除(Delete)

最後は、(3)で更新した2レコードを削除します。

これを行うことにより、「2020/12/20」のデータは2レコードとなり、「2020/12/21」のデータはなくなります。

parquetファイル上はどのようになるのでしょうか。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_cow' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (4) レコードを削除(Delete)
# 削除する場合は、削除したいDataFrameを作成しそれをdelete操作でappendする
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "B"),
    ("2020/12/21", "00008", "B"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'delete', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

実行後のファイル構成です。

[jmc-dev@4677d57f4b55 scripts]$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_cow/2020/12/2
2020-12-21 08:51:30         93 hudi_sample_cow/2020/12/20/.hoodie_partition_metadata
2020-12-21 09:11:21     434437 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-23-128_20201221001030.parquet
2020-12-21 09:27:59     434735 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-30-173_20201221002734.parquet
2020-12-21 08:53:49     434429 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_0-7-66_20201220235328.parquet
2020-12-21 08:51:31     434735 hudi_sample_cow/2020/12/20/2fb57f32-1994-4f3d-9230-81698cf8a004-0_2-6-58_20201220235108.parquet
2020-12-21 08:51:30          0 hudi_sample_cow/2020/12/20_$folder$
2020-12-21 08:53:48         93 hudi_sample_cow/2020/12/21/.hoodie_partition_metadata
2020-12-21 09:11:21     434664 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-23-129_20201221001030.parquet
2020-12-21 09:28:00     432433 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-30-174_20201221002734.parquet
2020-12-21 08:53:49     434644 hudi_sample_cow/2020/12/21/0915138b-97e7-49e6-af95-140f780284e2-0_1-7-67_20201220235328.parquet
2020-12-21 08:53:47          0 hudi_sample_cow/2020/12/21_$folder$

挿入や更新時と同様に、ファイルが追加されています。

12/20, 12/21の両方のファイルを見てみましょう。

2020/12/20:

レコードが削除され、2レコードだけのファイルになりました。

2020/12/21:

こちらは、中身が空のファイルができていました!

1レコードだけ存在している状態でそれを削除したので、たしかに理にかなった挙動ですね。

まとめ

「Copy On Write」テーブルは、シンプルなコンセプトながら柔軟にデータを格納できる仕組みでした。

データの更新後はparquetファイルが必ずコンパクションされるため、触れ込みどおり、読み込みが多いデータに対して有効で書き込みが多いデータは苦手そうです。

次回はもう1つのテーブル種別である「Merge On Read」テーブルを試していきます。

また、今回試していく中でいくつか疑問点が生まれたため、これも検証していきます。

  • 列の追加/削除はどうなるのか?
  • データ量が多い場合でも、parquetファイルはパーティション毎に1つなのか?

参考

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。