Amazon AthenaのIcebergテーブルのbucket関数で、ハッシュによるパーティション分割を利用してみる

パーティション変換関数のbucket関数を使った、ハッシュによるhidden partitioningについて、Amazon Athenaでのメリットや気になる挙動について確認しました。
2023.08.22

データアナリティクス事業本部 機械学習チームの鈴木です。

Icebergテーブルではパーティション変換関数を使用して既存のカラムからパーティションキーを計算することが可能です。(hidden partitioning)

そのうちbucket関数を使うと、設定だけでパーティションキーとしてハッシュ値を計算してくれて便利なので、実際に試してみました。

bucket関数を使うと嬉しいこと

カーディナリティの高いカラムをもとにパーティション分割したい場合に便利です。

このような場合、これまではAthenaのHiveのテーブルでは、例えば以下のようにする必要がありました。

  1. SQLでパーティション用に粒度を粗くしたカラムを新しく作り、それをパーティションキーとする。
  2. CTASでバケッティングを使う。
  3. パーティションキーを含むパスでS3にオブジェクトを配置し、injectionを使ってWhere句でオブジェクトキーを指定する。

1つ目のケースは、データ作成のための処理にパーティションキーを作成するための処理を入れ込む必要がありました。難しい処理ではないですが、システム的な処理をSQLに含めることになるので、例えば後から参加したメンバーがみたときになんのための処理なのか分からないかもしれません。

2つ目のケースは、バケッティングがCTASでのみサポートされるため、テーブル作成のたびにCTASをする必要がありました。全量洗い替えかスナップショットを作るような処理となるため、既存のテーブルにデータを追加したい場合には適していませんでした。

3つ目のケースは、高いカーディナリティのカラムをパーティションキーに指定できるものの、パーティション射影の型の仕様で複数のパーティションを跨いでスキャンができませんでした。また、S3はリクエストに料金がかかるため細かい粒度でオブジェクトを作ってしまうと課金が高くなってしまうリスクがありました。

bucket関数を使うことでこのような心配をしなくても、Iceberg側の仕組みでハッシュ値を計算しパーティション分割のキーに使うことができます。

この記事では、例を通して挙動やメリットを確認できればと思います。

データの準備

2023/8/1から2023/8/9の間に温度を測定するデバイスで1日の平均温度を測定したという想定で、ダミーのデータを作成しました。

例えば以下のようなものです。device_idカラムはUUIDで、このIDでデバイスを識別する想定です。

device_id,measurement_date,average_tempelature
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230801,33.15517397540419
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230802,28.80116589256078
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230803,33.47565968592869
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230804,26.019512991684145
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230805,27.026381797648526
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230806,28.756432628583468
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230807,28.445851188379837
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230808,34.29138745895433
54093d70-b58a-4a64-bbdd-7e140ca834f4,20230809,27.409012824331818
bfbb1b80-47fd-4657-a599-230becb40852,20230801,26.223212581867145
bfbb1b80-47fd-4657-a599-230becb40852,20230802,26.658852480915485
bfbb1b80-47fd-4657-a599-230becb40852,20230803,31.92728480493228
bfbb1b80-47fd-4657-a599-230becb40852,20230804,29.085712467877148

このファイルをS3バケットにアップロードした後、以下のようにHive形式でAthenaからGlueテーブルを作成しました。

-- S3バケット名は自身のものに置き換えてください。

CREATE EXTERNAL TABLE `device_data_csv`(
  device_id string,
  data string,
  average_tempelature float) 
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://S3バケット名/device_data_csv'
TBLPROPERTIES (
  'classification'='csv', 
  'columnsOrdered'='false', 
  'compressionType'='none', 
  'delimiter'=',', 
  'skip.header.line.count'='1')

なお、今回は120個のデバイスIDに関連するデータを生成しました。

デバイスIDの数

やってみた

1. パーティション分割したオブジェクトの作成

まず、以下のようにIcebergの形式でテーブルを作成しました。

-- S3バケット名は自身のものに置き換えてください。
-- 下記ドキュメントのCREATE TABLE ステートメントの例を参考にしました
-- https://docs.aws.amazon.com/ja_jp/athena/latest/ug/querying-iceberg-creating-tables.html

CREATE TABLE device_data_iceberg (
  device_id string,
  data string,
  average_tempelature float) 
PARTITIONED BY (bucket(16,device_id)) 
LOCATION 's3://S3バケット名/iceberg-device-data' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912',
  'optimize_rewrite_delete_file_threshold'='10'
)

先ほど作ったHiveのテーブルから取得したデータを、このIcebergのテーブルに入れてみて、S3上にどのようにオブジェクトが作成されるか確認しました。

INSERT INTO device_data_iceberg (device_id,"data",average_tempelature)
SELECT device_id,"data",average_tempelature
FROM device_data_csv

S3バケットを確認すると、確かに以下のように16のフォルダに分けてオブジェクトが作成されていました。これはPARTITIONED BYbucket(16,device_id)と指定したので意図通りですね。

Insertでできたオブジェクト

試しに一つオブジェクトを確認してみると以下のようになっていました。

作成されたオブジェクト

SQLからパーティションの情報を確認することも可能でした。

SELECT * FROM "device_data_iceberg$partitions";

パーティションの確認

2. 同一パーティション内でのデータスキャン量の確認

バケッティングの場合は一つのパーティション内でさらにスキャン量が削減されましたが、Icebergのbucket関数の場合はそうはならないと考えられます。念の為確認してみました。

-- S3バケット名は自身のものに置き換えてください。
CREATE TABLE device_data_iceberg_1 (
  device_id string,
  data string,
  average_tempelature float) 
PARTITIONED BY (bucket(1,device_id)) 
LOCATION 's3://S3バケット名/iceberg-device-data_1' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912',
  'optimize_rewrite_delete_file_threshold'='10'
)

以下のようにデータを挿入します。これで一つのパーティションに全てのデータを配置しました。

INSERT INTO device_data_iceberg_1 (device_id,"data",average_tempelature)
SELECT device_id,"data",average_tempelature
FROM device_data_csv

まずデバイスIDを指定しない場合の検索です。6.63KBのスキャンでした。

デバイスID指定なし

次にデバイスIDを指定した場合の検索です。9.33KBのスキャンでした。むしろちょっと増えてしまいましたね。

デバイスID指定あり

想定どおり、パーティションに含まれるデータは全てスキャンされるようでした。

3. パーティションへのデータの追加

最後に、既存のパーティションにどのようにデータが追加されるのか確認しました。device_data_icebergテーブルに以下のように適当なデータを入れてみました。

INSERT INTO device_data_iceberg
VALUES ('64093d70-b58a-4a64-ccdd-7e140ca834f7','20230801',27.758596)

パーティションの情報を確認しました。

SELECT * FROM "cm-nayuts-sample-db"."device_data_iceberg$partitions";

赤枠で囲ったパーティションにデータが追加されたことが分かりました。

追加されたデータの確認

最後に

Icebergテーブルのパーティション変換関数であるbucket関数を使った、ハッシュ値でのパーティション分割の例を紹介しました。

テーブルの設定で簡単にハッシュによるパーティションキーの計算ができるのでとても便利でした。

ほかに参考にした資料