[Apache Hudi] 0.9.0の新機能まとめ

2021.09.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データ・アナリティクス事業本部の森脇です。

先日、Apache Hudiの新しいバージョンである0.9.0がリリースされました。

いくつかの新機能が追加されており、その中から個人的に気になったものをまとめてみました。

気になった機能

Spark SQLのDML、DDLをサポート(experimental)

Apache Hudiテーブルの作成/データ操作にSpark SQLを利用できるようになりました。

これにより、CREATE TABLE ... AS SELECT(いわゆるCTAS), INSERT, MERGE INTO, UPDATE, DELETEを利用することが可能です。

例えば、今まではデータを削除したい場合、deleteオプションを指定してwrite操作が必要でした。

deleteDf.write.format("hudi"). \
  options(**hudi_delete_options). \
  mode("append"). \
  save(basePath)

そのため、まず削除したいデータのDataFrameを作成する必要があり、少し面倒でした。

0.9.0からは

spark.sql("DELETE FROM xyzTable WHERE col1 = 'foo'")

と、SQL操作でデータ削除をすることが可能となります。

まだ実験的なサポートではありますが、今後のブラッシュアップが楽しみな機能です。

(実際に試してみたところ、パーティションがあるテーブルでエラーが発生したり、UPDATEがUnsupporttedエラーになったりとまだ実用的ではありませんでした)

タイムトラベルクエリ

特定時刻のスナップショットに対してデータをクエリできるようになりました。

as.of.instantに時刻を指定することで、その時刻時点でのデータをクエリすることが可能です。

3つの形式で時刻を指定することが可能です。

spark.read.
  format("hudi").
  option("as.of.instant", "20210728141108").
  load(basePath)

spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28 14: 11: 08").
  load(basePath)

// "as.of.instant = 2021-07-28 00:00:00" と同じ扱い
spark.read.
  format("hudi").
  option("as.of.instant", "2021-07-28").
  load(basePath)

現在でもインクリメンタルクエリでbeginTimeに「000」を指定することで同じことができますが、タイムトラベルクエリは今後DMLでもサポートされる可能性があるようです。

この場合、以下のような書き方が可能になります。

select a,b,c from hudi_table AS OF 20210728141108

0.9.0ではCTAS(CREATE TABLE AS SELECT)もサポートされました。 そのため、CTASとタイムトラベルクエリを組み合わせることで、過去のスナップショットテーブルを簡単に作成することができるようになりそうです。

パーティション削除操作の追加

Hudiテーブルのパーティションを削除する操作が追加されました。

以下のように操作することで、既存のテーブルに対してパーティションの削除が可能です。

df.write.
  format("hudi").
  option("hoodie.datasource.write.operation", "delete_partition").   # パーティション削除用操作 
  option("hoodie.datasource.write.partitions.to.delete", "col1,col2").   # 削除したいパーティションをカンマ区切りで指定

コミット前バリデーション機能の追加

コミット前バリデーターを設定することができるようになりました。

これによりコミット前にデータ検証を行えるようになり、NGだった場合はロールバックできます。

利用したいバリデーター名をhoodie.precommit.validatorsオプションに指定することで登録します。(「,」で区切ることで複数個設定可能)

組み込みのバリデーターとして、以下3種類が用意されています。

名称 機能
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator 変更前後で結果が同一行の場合、検証OK
org.apache.hudi.client.validator.SqlQueryInequalityPreCommitValidator 変更前後で結果が同一行でない場合、検証OK(SqlQueryEqualityPreCommitValidatorの否定版)
org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator 変更後の結果が特定の値の場合、検証OK

これらは、SQLを指定する別のオプションと併用します。

対応するクラス SQLを設定するオプション名
SqlQueryEqualityPreCommitValidator hoodie.precommit.validators.equality.sql.queries
SqlQueryInequalityPreCommitValidator hoodie.precommit.validators.inequality.sql.queries
SqlQuerySingleResultPreCommitValidator hoodie.precommit.validators.single.value.sql.queries

例えば、以下のように利用することが可能です。

# Append後にレコード件数が同一であったら検証OK
df.write.format("hudi"). \
  options(**hudi_options). \
  option("hoodie.precommit.validators", "org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator"). \
  option("hoodie.precommit.validators.equality.sql.queries", "SELECT COUNT(*) FROM \<TABLE_NAME>"). \  # 「\<TABLE_NAME>」は、コミット対象のテーブル名に置換される
  mode("Append"). \
  save(basePath)

また、SparkPreCommitValidatorクラスを継承することで独自のバリデーターを作成することも可能です。

S3イベントトリガーのDeltaStreamerの追加

S3イベント駆動のDeltaStreamerが追加されました。

これを使うと、S3オブジェクトの追加をトリガーに、でHudiへのデータ取り込みを実行することが可能となります。

こちらに関しては公式がAWS構成図付きでブログを記載しています。

まとめ

Apache Hudi 0.9.0の特に気になった新機能を紹介しました。

今後予定されているアップデートにも気になるものがたくさんあるので、引き続きウォッチしていきたいです。

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。