AWS上でApache Hudiを動かす

2020.11.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の森脇です。

今回は、AWS上でApache Hudiを動かしてみたいと思います。

Apache Hudiとは?

Apache Software Foundation(以降、ASF)のプロジェクトであるOSSです。

Apache Hudi公式サイト(https://hudi.apache.org/)

データレイクを構築する際に利用可能で、ストリームデータ、ファイルデータをクラウドストレージ上で管理することが可能になります。

Spark上で動作し、以下のような機能が特徴です。

  • 高速なUpsert
  • ロールバック機能
  • データリカバリ

元々はUberが開発していたプロジェクトですが、ASFに寄贈されたようです。

AWS上で動かす際には、いくつか選択肢があります。

Amazon EMRを使用する

Amazon EMRには、Apache Hudiがプリインストールされています。

そのため、EMRを使うだけで利用することが可能です。

AWS Glueを利用する

AWS GlueのGlueジョブを使ってApache Hudiを動かすことが可能です。

今回はこちらの方法を試してみました。

AWS GlueでApache Hudiを動かす

Apache Hudiを動かすために、AWS環境上にリソースを作成していきます。

IAMロール

「glue.amazonaws.com」と信頼関係を持つIAMロールを作成し、以下のポリシーを割り当てます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:*", "logs:*"],
            "Resource": "*"
        }
    ]
}

S3バケット

3つのバケットを予め作成しておきます。

  • Glueスクリプト保存用
  • Glueの一時ディレクトリ用
  • Apache Hudi本体のjarライブラリ配置用
  • Hudiのデータ管理用

作成後、「Apache Hudi本体のjarライブラリ配置用」バケットには以下のライブラリを配置しておきます。

Glueジョブ

マネジメントコンソールからGlueジョブを作成します。

以下設定のジョブを作成しました。

(特に記載が無い設定はデフォルトのままです)

設定名
名前 ApacheHudiSampleJob
IAMロール 前述の手順で作成したIAMロール名
Type Spark
Glue version Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0)
このジョブ実行 ユーザーが作成する新しいスクリプト
スクリプトが保存されている S3 パス 前述の手順で作成したS3バケット
一時ディレクトリ 前述の手順で作成したS3バケット
依存 JARS パス 前述の手順で配置した2つのライブラリパスをカンマ区切りで指定

スクリプトを記述

Quick Startのサンプルを利用します。

今回はデータのimportとクエリを試してみます。

Quich Startの内容と、Glueジョブ設定を組み合わせた以下のスクリプトを設定します。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tableName = 'hudi_sample_table' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2 # insert時の並列数
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)


# 書き込んだ結果をSpark SQLでクエリ
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

job.commit()

Glueジョブの実行

ジョブを実行します。

正常終了した後にCloudwatch Logを確認すると、データのクエリが正しく行えていることを確認できます。

また、Hudiのデータ管理用S3バケットを確認すると、Hudiが生成したparquetファイル、メタデータファイルを確認することが可能です。

$ aws s3 ls --recursive s3://cm-moriwaki-hudi-sample--datalake
2020-11-30 06:50:42          0 hudi_sample_table/.hoodie/.aux/.bootstrap/.fileids_$folder$
2020-11-30 06:50:42          0 hudi_sample_table/.hoodie/.aux/.bootstrap/.partitions_$folder$
2020-11-30 06:50:42          0 hudi_sample_table/.hoodie/.aux/.bootstrap_$folder$
2020-11-30 06:50:42          0 hudi_sample_table/.hoodie/.aux_$folder$
2020-11-30 06:50:41          0 hudi_sample_table/.hoodie/.temp_$folder$
2020-11-30 06:50:53       3788 hudi_sample_table/.hoodie/20201129215037.commit
2020-11-30 06:50:45          0 hudi_sample_table/.hoodie/20201129215037.commit.requested
2020-11-30 06:50:46       2192 hudi_sample_table/.hoodie/20201129215037.inflight
2020-11-30 06:50:41          0 hudi_sample_table/.hoodie/archived_$folder$
2020-11-30 06:50:43        238 hudi_sample_table/.hoodie/hoodie.properties
2020-11-30 06:50:41          0 hudi_sample_table/.hoodie_$folder$
2020-11-30 06:50:51         93 hudi_sample_table/americas/brazil/sao_paulo/.hoodie_partition_metadata
2020-11-30 06:50:52     437450 hudi_sample_table/americas/brazil/sao_paulo/23d41e97-46ae-4380-bc0e-eee2a282f490-0_0-8-28_20201129215037.parquet
2020-11-30 06:50:50          0 hudi_sample_table/americas/brazil/sao_paulo_$folder$
2020-11-30 06:50:50          0 hudi_sample_table/americas/brazil_$folder$
2020-11-30 06:50:51         93 hudi_sample_table/americas/united_states/san_francisco/.hoodie_partition_metadata
2020-11-30 06:50:52     437835 hudi_sample_table/americas/united_states/san_francisco/7e942885-59dd-47d2-b7e2-707d2a603594-0_1-8-29_20201129215037.parquet
2020-11-30 06:50:50          0 hudi_sample_table/americas/united_states/san_francisco_$folder$
2020-11-30 06:50:50          0 hudi_sample_table/americas/united_states_$folder$
2020-11-30 06:50:50          0 hudi_sample_table/americas_$folder$
2020-11-30 06:50:51         93 hudi_sample_table/asia/india/chennai/.hoodie_partition_metadata
2020-11-30 06:50:52     437227 hudi_sample_table/asia/india/chennai/edb8efc4-b672-4d4c-9254-f3b276d53387-0_2-8-30_20201129215037.parquet
2020-11-30 06:50:50          0 hudi_sample_table/asia/india/chennai_$folder$
2020-11-30 06:50:50          0 hudi_sample_table/asia/india_$folder$
2020-11-30 06:50:50          0 hudi_sample_table/asia_$folder$
2020-11-30 06:50:40          0 hudi_sample_table_$folder$

パーティションが切られていることもわかりますね。

まとめ

AWS Glueを使用してApache Hudiを動かしてみました。

今回は基本的な操作のみ試しましたが、次回はデータリカバリ、ロールバック機能などを試そうと思います。

参考

※Apache®、Apache Hudi、Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。