[Apache Hudi]Bootsrap機能を使ってデータを登録する

2021.03.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の森脇です。

AWSから、Apache Hudiに関するAmazon EMRのニュースがありました。

New features from Apache Hudi available in Amazon EMR

Apache Hudiのいくつかの新機能がAmazon EMRにて利用可能になったようです。

この機能のうちの「ブートストラップ機能」が便利そうだったので実際に試してみました。

ブートストラップ機能とは

バージョン0.6.0で追加された機能です。

リリースノートから引用:

Bootstrapping existing parquet datasets : Adds support for bootstrapping existing datasets into Hudi, via both Spark datasource writer and deltastreamer tool, with support for reading from Hive, SparkSQL, AWS Athena (prestoDB support coming soon). See RFC-15 for technical details. Note that this is an experimental feature, which will be improved upon further in the 0.6.x versions.

通常、Apache Hudiにデータを登録する際にはSparkのデータフレームを入力にします。

そのため、ファイルを入力にしたい場合一度データフレームに読み込む必要がありました。

また、登録後のparquetファイルはメタデータと実データが混在する状態になっており、入力で使ったファイルとは中身が異なります。

このブートストラップ機能を利用すると「実データ」と「メタデータ」を分離してファイル化することが可能になります。

つまり、元々存在しているデータファイルには変更を行わずにHudiを利用することが可能です。 (※データファイルのフォーマットはparquet形式である必要があります)

試してみる

前述のAWSのブログではAmazon EMRを利用していました。

Glueジョブでも利用可能か試す意味もかねて、今回はGlueジョブを使ってみます。

Glueジョブの設定は、以前のブログと同じようにしました。

データファイル構成

日付毎のファイルを3つ配置し、入力とします。

s3://apache-hudi-bootstrap-test-data/test-data/
  20210311-data.parquet
  20210312-data.parquet
  20210313-data.parquet

それぞれのファイルの中身は以下のようにしました。

20210311-data.parquet

date id data
2021/03/11 20210311-0001 データ1
2021/03/11 20210311-0002 データ2
2021/03/11 20210311-0003 データ3
2021/03/11 20210311-0004 データ4
2021/03/11 20210311-0005 データ5

20210312-data.parquet

date id data
2021/03/12 20210312-0001 データ6
2021/03/12 20210312-0002 データ7
2021/03/12 20210312-0003 データ8
2021/03/12 20210312-0004 データ9
2021/03/12 20210312-0005 データ10

20210313-data.parquet

date id data
2021/03/13 20210313-0001 データ11
2021/03/13 20210313-0002 データ12
2021/03/13 20210313-0003 データ13
2021/03/13 20210313-0004 データ14
2021/03/13 20210313-0005 データ15

Glueジョブプログラム

前述のデータファイルが格納されたバケットを入力としてApache Hudiにブートストラップします。

ブートストラップしたデータを参照し、あわせてGlueデータカタログにも登録しています。

データ登録後に、

  • データフレームで参照可能か?
  • Athenaから参照可能か
  • の2点を確認します。

    import sys
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import StructType
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
    sc = spark.sparkContext
    glueContext = GlueContext(sc)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    tableName = 'hudi_test_data_table' # テーブル名
    inputPath = "s3://apache-hudi-bootstrap-test-data/test-data/" # parquetファイルが格納されたバケットパス
    hudiTablePath = f"s3://apache-hudi-bootstrap-test-meta/{tableName}/" # メタデータ(Hudiテーブル)の作成先。
    
    
    # Hudiのオプション
    hudi_options = {
      'hoodie.table.name': tableName, # テーブル名
      # 書き込みオプション
      'hoodie.datasource.write.recordkey.field': 'id', # レコードキーのカラム名
      'hoodie.datasource.write.table.name': tableName, # テーブル名
      'hoodie.datasource.write.operation': 'bootstrap', # ブートストラップにて書き込み
    
      # ブートストラップオプション
      'hoodie.bootstrap.base.path': inputPath, # データファイルのパス
      'hoodie.bootstrap.keygen.class': 'org.apache.hudi.keygen.SimpleKeyGenerator', # キージェネレーター
     
      
    
      # データカタログ連携オプション(hive_sync)
      'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
      'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名
      'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
      'hoodie.datasource.hive_sync.use_jdbc': 'false' # jdbcを利用すると接続エラーになったのでfalseにする。
    }
    
    
    # データの書き込み
    # 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
    schema = StructType([])
    emptyDataFrame = spark.createDataFrame(sc.emptyRDD(), schema)
    emptyDataFrame.write.format("hudi"). \
      options(**hudi_options). \
      mode("overwrite"). \
      save(hudiTablePath)
    
    
    # 書き込んだ結果を参照
    # 書き込んだ結果をSpark SQLでクエリ
    sfDF = spark. \
      read. \
      format("hudi"). \
      load(hudiTablePath)
    
    sfDF.createOrReplaceTempView("sf_table")
    
    spark.sql("select * from sf_table").show()
    
    job.commit()

    実行し、完了を待ちます。

    完了後、それぞれのリソースがどのようになったか確認します。

    データファイルS3バケット:

    $ aws --profile csa s3 ls --recursive s3://apache-hudi-bootstrap-test-data/
    2021-03-11 20:00:02          0 test-data/
    2021-03-11 20:00:13       2834 test-data/20210311-data.parquet
    2021-03-11 20:00:14       2838 test-data/20210312-data.parquet
    2021-03-11 20:00:15       2842 test-data/20210313-data.parquet

    余計なファイルは作成されておらず、アップロード時のまま変わっていません!

    メタデータS3バケット:

    $ aws --profile csa s3 ls s3://apache-hudi-bootstrap-test-meta/hudi_test_data_table/ 
                               PRE .hoodie/
    2021-03-11 20:38:28          0 .hoodie_$folder$
    2021-03-11 20:38:45         93 .hoodie_partition_metadata
    2021-03-11 20:38:46     433932 593ae849-63ad-47ab-a2da-0ddb39e6fe2a_1499-0-1499_00000000000001.parquet
    2021-03-11 20:38:46     433910 db5993e1-1d27-406a-a746-2d22fb40f629_499-0-499_00000000000001.parquet
    2021-03-11 20:38:46     433914 ef9be2f9-8a49-4175-91ee-c1ebdc48fd30_999-0-999_00000000000001.parquet

    いくつかのparquetファイルが作成されています。

    S3 Selectを利用して、ファイルの中身を確認してみます。

    レコードキーとして指定した「id」カラム以外は実データの情報は存在せず、メタデータのみがファイル化されていますね。

    Hudiテーブルから読み込んだデータフレームの表示結果をCloudwatch logsで確認してみます。

    メタデータ,実データも全てクエリできています!

    Glueデータカタログはどうでしょうか。

    通常のテーブル作成時と同じように、メタカラムと実データカラムが含まれたテーブルが作成されました!

    いい感じですね。

    最後に、このテーブルに対してAthenaでクエリをかけてみます。

    ・・・メタデータは取得できましたが、実データのカラムはすべて値が空になってしまいました。。。

    0.6.0時点ではまだ対応されていないのかもしれませんね。

    引き続き検証していきたいと思います。

    まとめ

    Apache Hudiのブートストラップ機能をGlueジョブで試してみました。

    実データ環境を汚すことなくApache Hudiを利用可能ですので、すでに存在するデータセットでApache Hudiを使いたい場合のハードルが下がりました。

    一方で、ブートストラップで作成したテーブルはAthenaでクエリを正しく行えませんでした。

    設定が悪いだけかもしれませんが、もし対応されていないのであれば今後の対応が待ち遠しいです!

    最新バージョンの0.7.0ではどうなのかも今後試していこうと思います。

    参照

    ※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。