[Apache Hudi] Datadogと連携してメトリクスを表示する

2021.08.18

データ・アナリティクス事業本部の森脇です。

Apache Hudiには0.8.0でメトリクス機能が追加されました。

この機能を使うことで、コミットやロールバックに関するデータを3rdツールと連携することが可能です。

連携先にはGraphiteJMXDatadog等が利用可能です。

今回はDatadogと連携し、メトリクスがどのように表示されるか試してみました。

Glueジョブ

例によってGlueジョブを使ってApache Hudiを動かします。

Datadogの接続情報はMetrics configsに設定することで連携が可能です。

Agentのようなものをインストールする必要はなく、Apache Hudiのみで連携可能です。

import sys
import random
import boto3
import botocore.exceptions

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# クイックスタート用ライブラリを利用してデータ、及びデータフレームを作成
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

def genDF():
    inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(random.randint(50, 100)))
    df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    return df

def existsTable(databaseName, tableName):
    glue = boto3.client("glue")
    exists = False
    try:
        resp = glue.get_table(
            DatabaseName="default",
            Name="hudi_metrics_rt",
        )
    except botocore.exceptions.ClientError as err:
        if err.response["Error"]["Code"] == "EntityNotFoundException":
            exists = False
    else:
        exists = True
    return exists
    

databaseName = "default"
tableName = 'hudi_metrics_sample' # テーブル名
bucketName = 'cm-moriwaki-hudi-sample--datalake' # 前述の手順で作成したHudiデータ格納用S3バケット
basePath = f's3://{bucketName}/{tableName}'

# Hudiのオプション
hudi_options = {
  'hoodie.table.name': tableName, # テーブル名
  # 書き込みオプション
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
  'hoodie.compact.inline': 'false',
  'hoodie.datasource.write.recordkey.field': 'uuid', # レコードキーのカラム名
  'hoodie.datasource.write.partitionpath.field': 'partitionpath', # パーティション対象のカラム名
  'hoodie.datasource.write.table.name': tableName, # テーブル名
  'hoodie.datasource.write.operation': 'upsert', # 書き込み操作種別
  'hoodie.datasource.write.precombine.field': 'ts', # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
  'hoodie.upsert.shuffle.parallelism': 2,  # upsert時の並列数(今回はinsert操作なので多分不要だがクイックスタートに記載があるのでそのまま利用)
  'hoodie.insert.shuffle.parallelism': 2, # insert時の並列数

  # データカタログ連携オプション(hive_sync)
  'hoodie.datasource.hive_sync.enable': 'true', # 連携を有効にする
  'hoodie.datasource.hive_sync.database': databaseName, # 連携先のデータベース名
  'hoodie.datasource.hive_sync.table': tableName, # 連携先のテーブル名
  'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city', # パーティションのフィールド名
  'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',  # パーティションの方式を指定
  'hoodie.datasource.hive_sync.use_jdbc': 'false', # jdbcを利用すると接続エラーになったのでfalseにする。

  # メトリクスオプション(for Datadog)
  'hoodie.metrics.on': 'true', # メトリクスをOn
  'hoodie.metrics.reporter.type': 'DATADOG', # 連携対象はDatadog
  'hoodie.metrics.datadog.report.period.seconds': '30', # レポート間隔
  'hoodie.metrics.datadog.api.site': 'US', # Datadogサイトのリージョン(US or EU)
  'hoodie.metrics.datadog.api.key': 'xxxxxxxxxxxxxxxxxxxxxxxxx', # APIキー
  'hoodie.metrics.datadog.api.key.skip.validation': 'false', # trueの場合、データ送信時の事前Validationをskipする
  'hoodie.metrics.datadog.metric.prefix': 'sample', # メトリクスのprefix
  'hoodie.metrics.datadog.metric.host': 'apache_hudi.glue', # メトリクスのホスト
}


# テーブルが存在しない場合は新規作成、存在する場合は追記
mode = "overwrite"
if existsTable(databaseName, tableName):
    mode = "append"

# データの書き込み
genDF().write.format("hudi"). \
  options(**hudi_options). \
  mode(mode). \
  save(basePath)

job.commit()

テーブルが存在しない場合は作成し、存在する場合はNレコードを追記するジョブを作成しました。

このジョブを複数回実行し、Datadogで結果を確認してみます。

メトリクスを確認してみる

DatadogのMetrics Explorerを見てみます。

Metricsがたくさん作成されています。

一覧:

sample.hudi_metrics_sample.HoodieWrapperFileSystem.listStatus
sample.hudi_metrics_sample.HoodieWrapperFileSystem.listStatus.totalDuration
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.create
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.create.totalDuration
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.delete
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.delete.totalDuration
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.getFileStatus
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.getFileStatus.totalDuration
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.listStatus
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.listStatus.totalDuration
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write.totalBytes
sample.hudi_metrics_sample.HoodieWrapperFileSystemMetaFolder.write.totalDuration
sample.hudi_metrics_sample.TimelineService.LATEST_DATA_FILES_BEFORE_ON_INSTANT
sample.hudi_metrics_sample.TimelineService.LATEST_PARTITION_DATA_FILE
sample.hudi_metrics_sample.TimelineService.LATEST_SLICES_BEFORE_ON_INSTANT
sample.hudi_metrics_sample.TimelineService.PEDING_COMPACTION_OPS
sample.hudi_metrics_sample.TimelineService.TOTAL_API_CALLS
sample.hudi_metrics_sample.TimelineService.TOTAL_API_TIME
sample.hudi_metrics_sample.TimelineService.TOTAL_CHECK_TIME
sample.hudi_metrics_sample.TimelineService.TOTAL_HANDLE_TIME
sample.hudi_metrics_sample.TimelineService.TOTAL_REFRESH_TIME
sample.hudi_metrics_sample.TimelineService.WRITE_VALUE_CNT
sample.hudi_metrics_sample.TimelineService.WRITE_VALUE_TIME
sample.hudi_metrics_sample.deltacommit.commitTime
sample.hudi_metrics_sample.deltacommit.duration
sample.hudi_metrics_sample.deltacommit.totalBytesWritten
sample.hudi_metrics_sample.deltacommit.totalCompactedRecordsUpdated
sample.hudi_metrics_sample.deltacommit.totalCreateTime
sample.hudi_metrics_sample.deltacommit.totalFilesInsert
sample.hudi_metrics_sample.deltacommit.totalFilesUpdate
sample.hudi_metrics_sample.deltacommit.totalInsertRecordsWritten
sample.hudi_metrics_sample.deltacommit.totalLogFilesCompacted
sample.hudi_metrics_sample.deltacommit.totalLogFilesSize
sample.hudi_metrics_sample.deltacommit.totalPartitionsWritten
sample.hudi_metrics_sample.deltacommit.totalRecordsWritten
sample.hudi_metrics_sample.deltacommit.totalScanTime
sample.hudi_metrics_sample.deltacommit.totalUpdateRecordsWritten
sample.hudi_metrics_sample.deltacommit.totalUpsertTime
sample.hudi_metrics_sample.finalize.duration
sample.hudi_metrics_sample.finalize.numFilesFinalized
sample.hudi_metrics_sample.index.UPSERT.duration
sample.hudi_metrics_sample.index.lookup.duration

試しにsample.hudi_metrics_sample.deltacommit.totalInsertRecordsWrittenを表示してみます。

コミット毎に追加(INSERT)されたレコード数の推移を確認することができました。

まとめ

Apache Hudiのメトリクス機能を利用することで、Datadogと連携することができました。

デバッグや性能監視に非常に便利そうだと感じました。

参考

※Apache®、Apache Hudi、Hudi、およびフレームロゴは、米国および、または他の国におけるApache Software Foundationの登録商標または商標です。これらのマークの使用は、Apache Software Foundationによる承認を意味するものではありません。