[Apache Hudi] Merge On Readテーブルを試してみる

2021.07.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データ・アナリティクス事業本部の森脇です。

以前、Apache Hudiの「Copy On Write」テーブル(以降、CoW)について検証を行いました。

今回は「Merge On Read」テーブルについて試していこうと思います。

前回の記事はこちら:

Apache HudiのCopy On Writeテーブルを試してみる

Merge On Read テーブルとは

CoWテーブルは、データの変更操作(INSERT/UPDATE/DELETE)の度に最新データが反映されたparquetファイルを作成する仕組みとなっていました。

この仕組みは高速にReadが行える一方で、毎回新しいparquetファイルを生成するのでWrite処理はコストが大きく、書き込み操作に時間がかかってしまう欠点があります。

Merge On Readは、CoWとは別のアプローチでデータ管理を行います。

公式ドキュメントのConceptsの記載を抜粋します。

Merge on read table is a superset of copy on write, in the sense it still supports read optimized queries of the table by exposing only the base/columnar files in latest file slices. Additionally, it stores incoming upserts for each file group, onto a row based delta log, to support snapshot queries by applying the delta log, onto the latest version of each file id on-the-fly during query time. Thus, this table type attempts to balance read and write amplification intelligently, to provide near real-time data. The most significant change here, would be to the compactor, which now carefully chooses which delta log files need to be compacted onto their columnar base file, to keep the query performance in check (larger delta log files would incur longer merge times with merge data on query side)

ポイントをまとめると、以下のようでした。

  • CoWのスーパーセット
  • CoWでは利用できない「読み取り最適化クエリ」をサポートしている
  • parquetファイル(base/columnar file)とavroファイル(row based delta log)の組み合わせでデータを構成する
  • 頻繁に書き込みが発生するワークロードで有効

変更データを差分データとして別で管理することによって、書き込みのワークロードを短時間で処理することができるようです。

また、コンパクションを行うことにより差分ファイルとベースファイルを統合することも可能なようです。

実際にどのような挙動になるのか、AWS Glueを使って検証してみました。

試してみる

前回のCoWテーブル検証で利用したデータと同じものを利用して、違いを探っていきます。

初期データ作成

以下の6レコードを挿入します。

「time」カラムがパーティションキーです。

Glueジョブのソースコード:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

# (1) 初回データ
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/18", "00001", "A"),
    ("2020/12/18", "00002", "A"),
    ("2020/12/19", "00003", "A"),
    ("2020/12/19", "00004", "A"),
    ("2020/12/20", "00005", "A"),
    ("2020/12/20", "00006", "A"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', # Merge On Readでの書き込み
  'hoodie.compact.inline': 'false', # 「true」にすると、INSERT/UPDATE/DELETE後に自動でコンパクションが実行される
  'hoodie.compact.inline.max.delta.commits': 20,
  'hoodie.parquet.small.file.limit': 0,
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが存在する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)


job.commit()

hoodie.datasource.write.storage.typeMERGE_ON_READを指定することでMerge On Readテーブルを作成できます。

Glueジョブを実行後、S3バケットで作成されたオブジェクトを確認します。

$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_mor/
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.fileids_$folder$
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.partitions_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux/.bootstrap_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.temp_$folder$
2021-07-02 13:39:12       3217 hudi_sample_mor/.hoodie/20210702043848.deltacommit
2021-07-02 13:39:04       2143 hudi_sample_mor/.hoodie/20210702043848.deltacommit.inflight
2021-07-02 13:39:03          0 hudi_sample_mor/.hoodie/20210702043848.deltacommit.requested
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/archived_$folder$
2021-07-02 13:38:52        328 hudi_sample_mor/.hoodie/hoodie.properties
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/18/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/18/d575e340-8874-46e8-8435-b1f0c9347505-0_0-6-56_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/18_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/19/.hoodie_partition_metadata
2021-07-02 13:39:10     434738 hudi_sample_mor/2020/12/19/03be68bb-3630-45fc-94f3-cb6825ea4c89-0_1-6-57_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/19_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/20/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/20/035cb985-46ee-4fd0-95ba-baae567e5e66-0_2-6-58_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/20_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020_$folder$

パーティション毎にparquetファイルが作成されました。これはCoWテーブルの時と同じ挙動です。

ファイルとデータのイメージはこのようになります。

parquetファイルの中身もCoWと同じ形式です(S3 Selectで確認)。

2020/12/18のデータ:

(2) INSERT

「2020/12/20」と「2020/12/21」のデータを1レコードづつINSERTします。

ソースコード:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (2) レコードを追加で挿入(Insert)
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "A"),
    ("2020/12/21", "00008", "A"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', # Merge On Readでの書き込み
  'hoodie.compact.inline': 'false', # 「true」にすると、INSERT/UPDATE/DELETE後に自動でコンパクションが実行される
  'hoodie.compact.inline.max.delta.commits': 20,
  'hoodie.parquet.small.file.limit': 0,
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

実行後、結果を確認します。

[jmc-dev@0b947fbdc2fc gorm]$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_mor/
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.fileids_$folder$
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.partitions_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux/.bootstrap_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.temp_$folder$
2021-07-02 13:39:12       3217 hudi_sample_mor/.hoodie/20210702043848.deltacommit
2021-07-02 13:39:04       2143 hudi_sample_mor/.hoodie/20210702043848.deltacommit.inflight
2021-07-02 13:39:03          0 hudi_sample_mor/.hoodie/20210702043848.deltacommit.requested
2021-07-02 13:51:51       2371 hudi_sample_mor/.hoodie/20210702045115.deltacommit
2021-07-02 13:51:44       1561 hudi_sample_mor/.hoodie/20210702045115.deltacommit.inflight
2021-07-02 13:51:42          0 hudi_sample_mor/.hoodie/20210702045115.deltacommit.requested
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/archived_$folder$
2021-07-02 13:38:52        328 hudi_sample_mor/.hoodie/hoodie.properties
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/18/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/18/d575e340-8874-46e8-8435-b1f0c9347505-0_0-6-56_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/18_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/19/.hoodie_partition_metadata
2021-07-02 13:39:10     434738 hudi_sample_mor/2020/12/19/03be68bb-3630-45fc-94f3-cb6825ea4c89-0_1-6-57_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/19_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/20/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/20/035cb985-46ee-4fd0-95ba-baae567e5e66-0_2-6-58_20210702043848.parquet
2021-07-02 13:51:50     434647 hudi_sample_mor/2020/12/20/556a153e-95cc-4ded-812b-d76cbc54e379-0_0-7-66_20210702045115.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/20_$folder$
2021-07-02 13:51:49         93 hudi_sample_mor/2020/12/21/.hoodie_partition_metadata
2021-07-02 13:51:50     434646 hudi_sample_mor/2020/12/21/7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_1-7-67_20210702045115.parquet
2021-07-02 13:51:48          0 hudi_sample_mor/2020/12/21_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020_$folder$
  • hudi_sample_mor/2020/12/20/556a153e-95cc-4ded-812b-d76cbc54e379-0_0-7-66_20210702045115.parquet
  • hudi_sample_mor/2020/12/21/7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_1-7-67_20210702045115.parquet

の2つのファイルが追加されました。

2020/12/21のファイルの中身を確認してみます。

2020/12/21のデータのみのparquetファイルですね。CoWと同じです。

では、2020/12/20の方はどうでしょうか。

INSERT処理で追加した1レコードのみがparquetファイル化されていました

CoWの場合には初期データ2レコード+新規1レコードの、計3レコード存在するparquetファイルが作成されたので、違う挙動になりました。

触れ込み通り、Merge On Readでは差分ファイルが作成されていることを確認できました。

(3) UPDATE

INSERTで追加した2レコードの「option」カラムの値を、「A」から「B」に更新してみます。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (3) レコードを更新(Update)
# 「option」列のデータを変更する
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "B"),
    ("2020/12/21", "00008", "B"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', # Merge On Readでの書き込み
  'hoodie.compact.inline': 'false', # 「true」にすると、INSERT/UPDATE/DELETE後に自動でコンパクションが実行される
  'hoodie.compact.inline.max.delta.commits': 20,
  'hoodie.parquet.small.file.limit': 0,
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

実行結果:

[jmc-dev@0b947fbdc2fc gorm]$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_mor/
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.fileids_$folder$
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.partitions_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux/.bootstrap_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.temp_$folder$
2021-07-02 13:39:12       3217 hudi_sample_mor/.hoodie/20210702043848.deltacommit
2021-07-02 13:39:04       2143 hudi_sample_mor/.hoodie/20210702043848.deltacommit.inflight
2021-07-02 13:39:03          0 hudi_sample_mor/.hoodie/20210702043848.deltacommit.requested
2021-07-02 13:51:51       2371 hudi_sample_mor/.hoodie/20210702045115.deltacommit
2021-07-02 13:51:44       1561 hudi_sample_mor/.hoodie/20210702045115.deltacommit.inflight
2021-07-02 13:51:42          0 hudi_sample_mor/.hoodie/20210702045115.deltacommit.requested
2021-07-02 14:05:21       2477 hudi_sample_mor/.hoodie/20210702050407.deltacommit
2021-07-02 14:05:19       2881 hudi_sample_mor/.hoodie/20210702050407.deltacommit.inflight
2021-07-02 14:05:12          0 hudi_sample_mor/.hoodie/20210702050407.deltacommit.requested
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/archived_$folder$
2021-07-02 13:38:52        328 hudi_sample_mor/.hoodie/hoodie.properties
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/18/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/18/d575e340-8874-46e8-8435-b1f0c9347505-0_0-6-56_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/18_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/19/.hoodie_partition_metadata
2021-07-02 13:39:10     434738 hudi_sample_mor/2020/12/19/03be68bb-3630-45fc-94f3-cb6825ea4c89-0_1-6-57_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/19_$folder$
2021-07-02 14:05:21        838 hudi_sample_mor/2020/12/20/.556a153e-95cc-4ded-812b-d76cbc54e379-0_20210702045115.log.1_0-23-129
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/20/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/20/035cb985-46ee-4fd0-95ba-baae567e5e66-0_2-6-58_20210702043848.parquet
2021-07-02 13:51:50     434647 hudi_sample_mor/2020/12/20/556a153e-95cc-4ded-812b-d76cbc54e379-0_0-7-66_20210702045115.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/20_$folder$
2021-07-02 14:05:21        838 hudi_sample_mor/2020/12/21/.7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_20210702045115.log.1_1-23-130
2021-07-02 13:51:49         93 hudi_sample_mor/2020/12/21/.hoodie_partition_metadata
2021-07-02 13:51:50     434646 hudi_sample_mor/2020/12/21/7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_1-7-67_20210702045115.parquet
2021-07-02 13:51:48          0 hudi_sample_mor/2020/12/21_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020_$folder$

新しいparquetファイルは作成されず、代わりにドットから始まる謎の隠しファイルが作成されました。

  • hudi_sample_mor/2020/12/20/.556a153e-95cc-4ded-812b-d76cbc54e379-0_20210702045115.log.1_0-23-129
  • hudi_sample_mor/2020/12/21/.7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_20210702045115.log.1_1-23-130

テキストエディターで中身を確認してみます。

2020/12/20のファイル:

ところどころ文字化けしているため、バイナリファイルのようです。

「avro schema」の定義が見えることから、おそらくavro形式のファイルのようです。 (avroフォーマットを拡張したApache Hudi独自のdelta logファイル。と言ったほうが正しそうです)

UPDATEしたデータはavro形式でdelta logファイル化されました。

INSERTではparquetファイルが作成され、UPDATEではavro形式で作成されるのが面白いですね。

(4) DELETE

INSERT/UPDATEを行った2レコードを削除します。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


tableName = 'hudi_sample_mor' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# (4) レコードを削除(Delete)
# 削除する場合は、削除したいDataFrameを作成しそれをdelete操作でappendする
schema = ["time", "transaction_id", "option"]
data = [
    ("2020/12/20", "00007", "B"),
    ("2020/12/21", "00008", "B"),
]
df = spark.createDataFrame(data, schema)

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', # Merge On Readでの書き込み
  'hoodie.compact.inline': 'false', # 「true」にすると、INSERT/UPDATE/DELETE後に自動でコンパクションが実行される
  'hoodie.compact.inline.max.delta.commits': 20,
  'hoodie.parquet.small.file.limit': 0,
  'hoodie.datasource.write.recordkey.field': 'transaction_id', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'time', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'delete', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'option', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)


job.commit()

CoWの場合は、「2020/12/21」には、空のparquetファイルが作成されました。

Merge On Readではどうなるでしょうか。

[jmc-dev@0b947fbdc2fc ~]$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake/hudi_sample_mor/
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.fileids_$folder$
2021-07-02 13:38:52          0 hudi_sample_mor/.hoodie/.aux/.bootstrap/.partitions_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux/.bootstrap_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.aux_$folder$
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/.temp_$folder$
2021-07-02 13:39:12       3217 hudi_sample_mor/.hoodie/20210702043848.deltacommit
2021-07-02 13:39:04       2143 hudi_sample_mor/.hoodie/20210702043848.deltacommit.inflight
2021-07-02 13:39:03          0 hudi_sample_mor/.hoodie/20210702043848.deltacommit.requested
2021-07-02 13:51:51       2371 hudi_sample_mor/.hoodie/20210702045115.deltacommit
2021-07-02 13:51:44       1561 hudi_sample_mor/.hoodie/20210702045115.deltacommit.inflight
2021-07-02 13:51:42          0 hudi_sample_mor/.hoodie/20210702045115.deltacommit.requested
2021-07-02 14:05:21       2477 hudi_sample_mor/.hoodie/20210702050407.deltacommit
2021-07-02 14:05:19       2881 hudi_sample_mor/.hoodie/20210702050407.deltacommit.inflight
2021-07-02 14:05:12          0 hudi_sample_mor/.hoodie/20210702050407.deltacommit.requested
2021-07-02 14:19:41       2477 hudi_sample_mor/.hoodie/20210702051841.deltacommit
2021-07-02 14:19:38       2881 hudi_sample_mor/.hoodie/20210702051841.deltacommit.inflight
2021-07-02 14:18:44          0 hudi_sample_mor/.hoodie/20210702051841.deltacommit.requested
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie/archived_$folder$
2021-07-02 13:38:52        328 hudi_sample_mor/.hoodie/hoodie.properties
2021-07-02 13:38:51          0 hudi_sample_mor/.hoodie_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/18/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/18/d575e340-8874-46e8-8435-b1f0c9347505-0_0-6-56_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/18_$folder$
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/19/.hoodie_partition_metadata
2021-07-02 13:39:10     434738 hudi_sample_mor/2020/12/19/03be68bb-3630-45fc-94f3-cb6825ea4c89-0_1-6-57_20210702043848.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/19_$folder$
2021-07-02 14:05:21        838 hudi_sample_mor/2020/12/20/.556a153e-95cc-4ded-812b-d76cbc54e379-0_20210702045115.log.1_0-23-129
2021-07-02 14:19:40        820 hudi_sample_mor/2020/12/20/.556a153e-95cc-4ded-812b-d76cbc54e379-0_20210702045115.log.2_0-30-174
2021-07-02 13:39:09         93 hudi_sample_mor/2020/12/20/.hoodie_partition_metadata
2021-07-02 13:39:10     434737 hudi_sample_mor/2020/12/20/035cb985-46ee-4fd0-95ba-baae567e5e66-0_2-6-58_20210702043848.parquet
2021-07-02 13:51:50     434647 hudi_sample_mor/2020/12/20/556a153e-95cc-4ded-812b-d76cbc54e379-0_0-7-66_20210702045115.parquet
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12/20_$folder$
2021-07-02 14:05:21        838 hudi_sample_mor/2020/12/21/.7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_20210702045115.log.1_1-23-130
2021-07-02 14:19:40        820 hudi_sample_mor/2020/12/21/.7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_20210702045115.log.2_1-30-175
2021-07-02 13:51:49         93 hudi_sample_mor/2020/12/21/.hoodie_partition_metadata
2021-07-02 13:51:50     434646 hudi_sample_mor/2020/12/21/7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_1-7-67_20210702045115.parquet
2021-07-02 13:51:48          0 hudi_sample_mor/2020/12/21_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020/12_$folder$
2021-07-02 13:39:08          0 hudi_sample_mor/2020_$folder$

UPDATE操作時と同様に、parquetファイルは作成されずavroファイルが作成されました。

  • hudi_sample_mor/2020/12/20/.556a153e-95cc-4ded-812b-d76cbc54e379-0_20210702045115.log.2_0-30-174
  • hudi_sample_mor/2020/12/21/.7291b495-bbdd-41dd-86d2-07511e0b4bfe-0_20210702045115.log.2_1-30-175

中身を確認してみます。

2020/12/20のファイル:

2020/12/21のファイル:

同じ中身のファイルが作成されています。

また、UPDATE時に作成された時のavroファイルにはデータらしきものが入っていましたが、こちらのファイルには存在しません。

「削除したことを表すdelta logファイル」ということっぽいですね。

まとめ

Merge On ReadテーブルでのCRUD操作を試してみた結果、CoWとは違う戦略でデータを構成していることがわかりました。

操作によって別のフォーマットファイルを作成する仕組みはとても興味深いですね。

Apache Hudiは現在も活発に開発が続けられているOSSですので、今後もウォッチしていこうと思います。

次回は、今回触らなかった「読み取り最適化クエリ」,「コンパクション」を試していこうと思います。

参考

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。