AWS上でApache Hudiを動かし、Glueデータカタログと連携する

AWS上でApache Hudiを動かし、Glueデータカタログと連携する

Clock Icon2020.11.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の森脇です。

以前の記事にて、AWS上でApache Hudiを動かす方法を試しました。

今回はApache HudiとGlueデータカタログを連携させてみます。

Glueデータカタログと連携することで、「Apache Hudiでテーブルを作成し、Amazon Athenaで参照する」といった使い方が可能となります。

早速試してみます。

基本方針

連携するためには、以下2つの設定を変更します。

  • AWS Glueのジョブ設定
  • Apache Hudiのoption

AWS Glueジョブ設定には、「GlueデータカタログをHiveメタストアとして利用する」オプションがあります。

また、Apache Hudiには、「Hiveメタストアと連携する」オプションが存在します。

これらを両方利用することで、Glueデータカタログと連携することが可能です。

試してみる

以前の記事で検証した環境をベースに利用します。

事前に準備するAWSリソースなどは、上記を参照ください。

ベース設定からいくつか変更を行い検証します。

IAMロール

Glueデータカタログにアクセスできるようにするため、「glue:*」を許可するようポリシーを更新します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*",
                "logs:*",
                "glue:*"
            ],
            "Resource": "*"
        }
    ]
}

Glueジョブ

前回作成したジョブの「ジョブの編集」をクリックし、「ジョブパラメーター」設定を追加します。

キー名
--enable-glue-datacatalog t

この設定で、GlueジョブがGlueデータカタログをHiveメタストアとして使用するようになります。

スクリプト

ジョブのスクリプトを変更します。

Hiveメタストアと連携するために「hoodie.datasource.hive_sync.*」設定をoptionに指定します。

また、今回はテーブルへの書き込みだけ確認すればよいため、クエリ実行の処理は除きました。

以下のようになります。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tableName = 'hudi_sample_table' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'insert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数

  # データカタログ連携オプション(hive_sync)
  'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
  'hoodie.datasource.hive_sync.database': 'default', # 連携先のデータベース名
  'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
  'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名
  'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',  # パーティションの方式を指定
  'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。
}


# データの書き込み
# 「mode」にoverwriteが指定されている場合、テーブルが作成する場合に再作成する
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)


job.commit()

Glueジョブの実行

変更後のGlueジョブを実行し、正常終了したら完了です。 マネジメントコンソールでGlueデータカタログを見ると、「default」データベースに「hudi_sample_table」テーブルが作成されていることが確認できました。

パーティションも正しく認識されていますね。

この状態でAthenaを使ってみたところ、正しくクエリを実行することもできました!

まとめ

Apache Hudiを利用することで、テーブル作成時にGlueデータカタログに情報を登録することができました。

Athenaを使う際に便利そうですね。

今後もHudiの機能を試していきたいと思います。

参考

※Apache®、Apache Hudi、Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.