AWS GlueのIcebergテーブルに対して自動的にコンパクションを実行する機能を使ってみた

AWS GlueのIcebergテーブルに対して自動的にコンパクションを実行する機能を使ってみた

GlueがサポートしたIcebergテーブルに対するautomatic compactionの機能がどのように使えるか確認したのでご紹介します。
Clock Icon2023.11.19

データアナリティクス事業本部 機械学習チームの鈴木です。

先日、Glueデータカタログ上からIceberg形式のGlueテーブルに対して自動でファイルのコンパクションを実行する機能のリリースがアナウンスされていたので試してみました。

例えば機械学習の文脈でも、データレイク上のIcebergテーブルから特徴量を取得し利用することがあります。『Amazon SageMaker Feature Store が Apache Iceberg テーブルフォーマットをサポートするように』のようなユースケースです。このアナウンスでも紹介されているように、小さなデータファイルをまとめて大きなファイルに置き換える仕組みを自動化する機能がGlueデータカタログでリリースされました。

自動的にコンパクションを実行する機能について

Icebergテーブルへのトランザクション書き込みによって生成された個々の小さなファイルを、自動的にいくつかの大きなファイルに圧縮する機能になります。

Athenaのガイドでは、Icebergテーブルの最適化について、現状OPTIMIZEVACUUMの2つのメンテナンスコマンドが提供されています。

いずれも手動実行のためのコマンドになり、定期実行するためには専用のジョブを作成しスケジュールしておく必要があります。

今回リリースされた機能は、トランザクションにより作成された小さなファイルをGlueの機能で自動的に統合してくれるものとなります。

なお、先にOPTIMIZEVACUUMのメンテナンスコマンドを引き合いに出しましたが、この機能はOPTIMIZEに類似しているものの、記事執筆時点では厳密には異なる機能だと理解しています。

Amazon AthenaではIcebergテーブルの更新には、『Updating Iceberg table data - Amazon Athena』に記載のようにMerge-On-Read (MOR)を採用しています。更新により発生した削除データは、圧縮実行時にマージされます。

冒頭のブログの『Things to know』によると、この自動的なコンパクション機能では、削除ファイルはまとめないことが記載されていますが、OPTIMIZEコマンドでは削除ファイルをデータファイルにまとめます。

Icebergテーブルの最適化のイメージについては、以下のブログ記事が参考になりました。

事前準備 

では、自動的なコンパクション機能を試していきたいですが、そのためにまず、事前準備として検証に必要なリソースを作成しました。

1. Icebergテーブルの作成

コンパクションの対象とするIcebergテーブルを作成しました。

以下のようにフォーマットはPARQUET、SNAPPYの圧縮方式のIcebergテーブルを作成しました。特にデータを配置するS3バケット名は自身のものに修正してください。

CREATE TABLE target_table_iceberg_parquet (
  item_id string,
  item_value string)
LOCATION 's3://データを配置するS3バケット名/target_table_iceberg_parquet'
TBLPROPERTIES (
  'table_type'='iceberg',
  'write_compression'='snappy',
  'format'='PARQUET'
);

続いて以下のようにしてデータを格納しておきました。

INSERT INTO target_table_iceberg_parquet (item_id, item_value)
SELECT item_id, item_value
FROM sample_table;

なお、この機能の対象となるIcebergテーブルの要件については以下のページをご確認ください。

ここで以下のようにテーブルが参照するファイルを確認すると、1ファイルであることが分かりました。

SELECT * FROM "target_table_iceberg_parquet$files"

INSERT後ファイル数

2. IAMロールの作成

自動コンパクション設定時にGlueデータカタログで指定するIAMロールを作成しました。

以下のガイドを参考に作成しました。なお、今回はLakeFormation下の環境ではなく、ただのGlueデータカタログとS3バケットを使っているだけの環境を想定しています。

まず、以下のような許可ポリシーを作成しました。特にデータを配置するS3バケット名アカウントIDGlueデータベース名は自身のものに修正してください。リージョンは東京リージョンを想定しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::データを配置するS3バケット名/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::データを配置するS3バケット名"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:UpdateTable",
                "glue:GetTable"
            ],
            "Resource": [
                "arn:aws:glue:ap-northeast-1:アカウントID:table/Glueデータベース名/target_table_iceberg_parquet",
                "arn:aws:glue:ap-northeast-1:アカウントID:database/Glueデータベース名",
                "arn:aws:glue:ap-northeast-1:アカウントID:catalog"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:ap-northeast-1:アカウントID:log-group:/aws-glue/iceberg-compaction/logs:*"
        }
    ]
}

以下のようにポリシーの作成からポリシーを作成しました。

IAMポリシーの作成

次に、ロールを作成から、Glueをユースケースに、このポリシーをアタッチしたIAMロールを作成しました。名前はもちろんなんでも構いませんが、後の説明で分かりやすいよう、cm-nayuts-glue-table-optimization-roleという名前にしました。

IAMロールの作成

以下のようにIAMロールができました。

アタッチされた許可ポリシー

信頼ポリシー

コンパクションの検証

次に、実際に小さいファイルを作成した後、自動的なコンパクションを設定して、どのようにデータファイルが圧縮されるか確認します。

1. 小さいファイルの作成

何回かトランザクションが実行されたと想定して、以下のような適当なUPDATE文を一つづつ実行しました。

UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='1';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='10';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='100';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='30';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='3331';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='70';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='500';
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='9995';

実行後、以下のようにテーブルが参照するファイルを確認すると、数が増えて9ファイルであることが分かりました。

SELECT * FROM "target_table_iceberg_parquet$files"

UPDATE後ファイル数

2. Glueテーブルでコンパクションの有効化・コンパクションの実行

Glueの画面から該当するGlueテーブルを開き、ページ下部のTable optimizationタブよりEnable compactionボタンをクリックしました。

Enable compactionボタンをクリック

Enable compaction画面に遷移するので、IAM roleとして先ほど作成したIAMロールを選択し、Enable compactionボタンをクリックしました。

Enable compaction画面

再度Glueテーブルを開くと、Compaction statusがEnabledになっていたので少し待ちました。

Compaction statusがEnabled

さらにしばらくすると、Compaction statusがSuccessになりました。

Compaction statusがSuccess

Compaction historyタブから履歴を確認することも可能でした。

3. 結果の確認

以下のようにテーブルが参照するファイルを確認すると、2ファイルであることが分かりました。

SELECT * FROM "target_table_iceberg_parquet$files"

コンパクション後ファイル数

関連機能の確認

1. メトリクスの確認

CloudWatchメトリクスより、コンパクションに使ったDPU数などのメトリクスを確認することができました。

メトリクスの確認

2. ログの確認

IAMポリシーでも定義しましたが、CloudWatch Logsに実行したログが出力されました。

ログの確認

3. OPTIMIZEコマンドとの違い

コンパクション実行後のテーブルにOPTIMIZEコマンドを実行して違いを確認してみました。

OPTIMIZE target_table_iceberg_parquet REWRITE DATA USING BIN_PACK

実行後にファイル件数を確認し、1ファイルであることが分かりました。

OPTIMIZE後の件数

自動的なコンパクションでスキップされていたデータがまとめられたものと考えられます。

最後に

GlueがサポートしたIcebergテーブルに対する自動的なコンパクション機能について設定方法例や動きについてご紹介しました。Icebergテーブルへのサポートがますます充実してきており、AthenaおよびGlueでぜひ採用したくなります!

参考になりましたら幸いです。

参考

APIからも利用できるアナウンスがされていましたので、興味のある方はマネジメントコンソールだけでなく、AWS CLIや各種SDKからも利用できるかご確認ください。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.