この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の森脇です。
以前の記事にて、AWS上でApache Hudiを動かす方法を試しました。
今回はApache HudiとGlueデータカタログを連携させてみます。
Glueデータカタログと連携することで、「Apache Hudiでテーブルを作成し、Amazon Athenaで参照する」といった使い方が可能となります。
早速試してみます。
基本方針
連携するためには、以下2つの設定を変更します。
- AWS Glueのジョブ設定
- Apache Hudiのoption
AWS Glueジョブ設定には、「GlueデータカタログをHiveメタストアとして利用する」オプションがあります。
また、Apache Hudiには、「Hiveメタストアと連携する」オプションが存在します。
これらを両方利用することで、Glueデータカタログと連携することが可能です。
試してみる
以前の記事で検証した環境をベースに利用します。
事前に準備するAWSリソースなどは、上記を参照ください。
ベース設定からいくつか変更を行い検証します。
IAMロール
Glueデータカタログにアクセスできるようにするため、「glue:*」を許可するようポリシーを更新します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*",
"logs:*",
"glue:*"
],
"Resource": "*"
}
]
}
Glueジョブ
前回作成したジョブの「ジョブの編集」をクリックし、「ジョブパラメーター」設定を追加します。
キー名 | 値 |
---|---|
--enable-glue-datacatalog | t |
この設定で、GlueジョブがGlueデータカタログをHiveメタストアとして使用するようになります。
スクリプト
ジョブのスクリプトを変更します。
Hiveメタストアと連携するために「hoodie.datasource.hive_sync.*」設定をoptionに指定します。
また、今回はテーブルへの書き込みだけ確認すればよいため、クエリ実行の処理は除きました。
以下のようになります。
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()
tableName = 'hudi_sample_table' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'
# Hudiのオプション
hudi_options = {
'hoodie.table.name': tableName, # テーブル名
# 書き込みオプション
'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
'hoodie.datasource.write.table.name': tableName, # テーブル名
'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
'hoodie.upsert.shuffle.parallelism': 2, # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数
# データカタログ連携オプション(hive_sync)
'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名
'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', # パーティションの方式を指定
'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。
}
# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
job.commit()
Glueジョブの実行
変更後のGlueジョブを実行し、正常終了したら完了です。 マネジメントコンソールでGlueデータカタログを見ると、「default」データベースに「hudi_sample_table」テーブルが作成されていることが確認できました。
パーティションも正しく認識されていますね。
この状態でAthenaを使ってみたところ、正しくクエリを実行することもできました!
まとめ
Apache Hudiを利用することで、テーブル作成時にGlueデータカタログに情報を登録することができました。
Athenaを使う際に便利そうですね。
今後もHudiの機能を試していきたいと思います。
参考
※Apache®、Apache Hudi、Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。