Amazon Athena Apache IcebergテーブルフォーマットによるACID Transactionを試してみました! #reinvent

2021.12.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部コンサルティングチームの石川です。Apache Icebergテーブルフォーマットを用いて、AthenaからACIDにINSERT、UPDATE、DELETE、タイムトラベル、オプティマイズの操作を可能にする新機能のパプリックプレビューが開始しました。Amazon S3データに対して信頼性の高い行レベルの変更が可能になりますので、実際に試してみます。

AthenaのApache Iceberg機能はプレビューリリースであり、変更される可能性があります。潜在的なデータの損失または破損を回避するために、本番データセットでこのプレビューを使用しないでください。

Apache Icebergとは

今回は、Amazon AthenaがサポートしたApache Icebergというテーブルフォーマットは、巨大なペタバイト規模のテーブル用に設計されたオープンなテーブルフォーマットです。テーブルフォーマットは、テーブルを構成するすべてのファイルをどのように管理、整理、追跡するかを決定することです。この中には、物理データファイル(ParquetやAvroなど)やテーブルの管理情報が含まれます。

IcebergのACID Transactionを試す!

AWS Lake Formationを有効にしていない状態の米国東部 (バージニア北部)で、下記の動作を確認しました。

データレイク用のS3バケット・フォルダの準備

一般的なデータレイクと同様にバケットを用意します。今回はバケットの下にiceberg_tableというフォルダを作成しました。

プレビュー中の機能を有効するWorkspaceを作成

プレビュー中の機能を有効にするには、AmazonAthenaIcebergPreviewという名前のWorkspaceを使用してクエリを実行する必要があります。おまじないのようなものだと思ってください。下記の以外の項目はデフォルトのままで構いません。

Query Editorを開き、WorkgroupからAmazonAthenaIcebergPreviewを選択します。

準備ができたので、動作を確認します。

テーブルの作成

table_typeにはICEBERG、formatはparquetを指定します。なお、2つ目のパーティションキーに指定したbucket(16,id)は、id_bucketというパーティションキーに変換されます。

CREATE TABLE iceberg_table (
  id int,
  data string,
  category string) 
PARTITIONED BY (category, bucket(16,id)) 
LOCATION 's3://<mybucket>/iceberg_table/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'compaction_bin_pack_target_file_size_bytes'='536870912' 
)

テーブルを作成した直後は、S3のフォルダにメタデータのみが登録されています。

% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive
2021-12-06 21:30:30          0 iceberg_table/
2021-12-07 13:40:55       1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json

レコードのINSERT

以下の2行を順番にINSERTします。

INSERT INTO sampledb.iceberg_table (id, data, category) values(1, 'mac-intel', 'macbook');
INSERT INTO sampledb.iceberg_table (id, data, category) values(2, 'mac-m1', 'macbook');

一行目をINSERTするのに6.225秒かかりました。2行目も概ね同じくらい時間がかかりました。

レコードのSELECTとタイムトラベル

INSERTしたレコードを確認します。

SELECT * from sampledb.iceberg_table;

タイムトラベル(Oracleのフラッシュバッククエリのようなもの)という機能を使って、過去に遡ってクエリすることもできます。「一日前」に指定すると先程作ったばかりのテーブルなので、0レコードとなります。

SELECT * FROM sampledb.iceberg_table FOR SYSTEM_TIME AS OF (current_timestamp - interval '1' day)

ここでデータがどのような状態で格納されているのか確認したいと思います。タイムスタンプから、データをINSERTするたびに2つのAVROファイル、1つのParquetファイル、1つのjsonファイルが作成されていることが確認できます。

% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive
2021-12-06 21:30:30          0 iceberg_table/
2021-12-07 13:42:38        453 iceberg_table/data/3494c159//iceberg_table/category=macbook/id_bucket=4/82f67f06-a0e1-4522-ba48-3cbe6d3a8be6.parquet
2021-12-07 13:45:35        438 iceberg_table/data/4b0be337//iceberg_table/category=macbook/id_bucket=4/dde0a679-84f8-46f4-891e-33355a25f396.parquet
2021-12-07 13:40:55       1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json
2021-12-07 13:42:39       2336 iceberg_table/metadata/00001-b6262f06-8805-40c6-9b28-7b1d361f3705.metadata.json
2021-12-07 13:45:36       3289 iceberg_table/metadata/00002-79008873-1a1b-44db-830a-cdeae6c87dde.metadata.json
2021-12-07 13:42:38       7033 iceberg_table/metadata/958b8acb-2800-473b-b55e-486333344762-m0.avro
2021-12-07 13:45:35       7033 iceberg_table/metadata/b8518cfe-ffeb-4b58-880b-81ffe1156b85-m0.avro
2021-12-07 13:42:39       4270 iceberg_table/metadata/snap-1296825016097872104-1-958b8acb-2800-473b-b55e-486333344762.avro
2021-12-07 13:45:35       4340 iceberg_table/metadata/snap-6943776344382491461-1-b8518cfe-ffeb-4b58-880b-81ffe1156b85.avro

レコードのUPDATE

idが2のレコードを更新します。

UPDATE sampledb.iceberg_table SET data='mac-m1-max' WHERE id = 2;

正常に更新されました。レコードを確認します。

SELECT * from sampledb.iceberg_table;

レコードのDELETE

idが1のレコードを削除します。

DELETE FROM iceberg_table WHERE id=1;

正常に削除されました。レコードを確認します。

SELECT * from sampledb.iceberg_table;

もう一度、ここでデータがどのような状態で格納されているのか確認したいと思います。データが更新されるたびにファイルが作成されていることが確認できます。

% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive
2021-12-06 21:30:30          0 iceberg_table/
2021-12-07 14:00:57        943 iceberg_table/data/1e23f44b//iceberg_table/category=macbook/id_bucket=4/delete/position/c9a1ddd3-7b1b-4ca3-be6d-fc3218746c66.parquet
2021-12-07 14:00:58        458 iceberg_table/data/227a0b59//iceberg_table/category=macbook/id_bucket=4/821f0cd3-0fc4-4f7e-9e32-3c5b50c73d8f.parquet
2021-12-07 13:42:38        453 iceberg_table/data/3494c159//iceberg_table/category=macbook/id_bucket=4/82f67f06-a0e1-4522-ba48-3cbe6d3a8be6.parquet
2021-12-07 13:45:35        438 iceberg_table/data/4b0be337//iceberg_table/category=macbook/id_bucket=4/dde0a679-84f8-46f4-891e-33355a25f396.parquet
2021-12-07 14:11:44        941 iceberg_table/data/4d6d4e3a//iceberg_table/category=macbook/id_bucket=4/delete/position/8620f0ba-28b8-46e1-81b7-b0612fbdaf8c.parquet
2021-12-07 13:40:55       1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json
2021-12-07 13:42:39       2336 iceberg_table/metadata/00001-b6262f06-8805-40c6-9b28-7b1d361f3705.metadata.json
2021-12-07 13:45:36       3289 iceberg_table/metadata/00002-79008873-1a1b-44db-830a-cdeae6c87dde.metadata.json
2021-12-07 14:00:58       4319 iceberg_table/metadata/00003-18926040-5269-48d2-b613-cb3286fe4ad9.metadata.json
2021-12-07 14:11:46       5287 iceberg_table/metadata/00004-8f8ca31a-df57-4292-9f07-fe18f5bba334.metadata.json
2021-12-07 14:11:45       7088 iceberg_table/metadata/5c2327c2-b66f-47bb-a586-4921410731e7-m0.avro
2021-12-07 13:42:38       7033 iceberg_table/metadata/958b8acb-2800-473b-b55e-486333344762-m0.avro
2021-12-07 13:45:35       7033 iceberg_table/metadata/b8518cfe-ffeb-4b58-880b-81ffe1156b85-m0.avro
2021-12-07 14:00:58       7036 iceberg_table/metadata/f53d1984-5b09-4c82-8c76-52c125467081-m0.avro
2021-12-07 14:00:58       7089 iceberg_table/metadata/f53d1984-5b09-4c82-8c76-52c125467081-m1.avro
2021-12-07 13:42:39       4270 iceberg_table/metadata/snap-1296825016097872104-1-958b8acb-2800-473b-b55e-486333344762.avro
2021-12-07 14:00:58       4408 iceberg_table/metadata/snap-3195787545865691157-1-f53d1984-5b09-4c82-8c76-52c125467081.avro
2021-12-07 14:11:45       4450 iceberg_table/metadata/snap-3742406407748705121-1-5c2327c2-b66f-47bb-a586-4921410731e7.avro
2021-12-07 13:45:35       4340 iceberg_table/metadata/snap-6943776344382491461-1-b8518cfe-ffeb-4b58-880b-81ffe1156b85.avro

テーブルのオプティマイズ

上記の通り、データを更新し続けるとファイルが作成されます。Icebergテーブルのパフォーマンスを最適化するために、手動圧縮をサポートしています。圧縮によってテーブルの内容を変更せずにテーブルのファイルレイアウトが最適化されます。

このREWRITE DATAアクションは、述語を使用して、一致する行を含むファイルは最適化されます。圧縮操作の影響を受けるファイルの数を制御するために、WHERE句を指定できます。

下記の例では、idが2のレコードを含むファイルを最適化の対象に指定しています。

OPTIMIZE sampledb.iceberg_table REWRITE DATA
  USING BIN_PACK
  WHERE id=2;

テーブルのDROP

最後にテーブルの削除します。DROPのときはテーブル名をバッククオートで括らないとエラーになってしまいました。

DROP TABLE <code>iceberg_table</code>;

なお、他のデータレイクと同様に、テーブルをDROPしてもS3上のデータファイルが削除されません。

クエリの処理時間一覧

クエリ 処理時間(秒)
テーブル作成 0.744
1レコードINSERT 6.225
レコードのSELECT 14.857
レコードのSELECT(タイムトラベル) 12.966
レコードのUPDATE 17.359
レコードのDELETE 18.243
テーブルのオプティマイズ 6.556
テーブルのDROP 0.723

Icebergの強みを活かす

INSERT INTOで300万レコードのデータコピー

普通のAthenaのテーブルと比較して、オーバヘッドが生じることが確認できました。上記では、1レコードINSERTでも7秒かかったりしたので、300万レコードをINSERT INTOしたときの時間がどうなるのかを確認したいと思います。

上記のテーブルからパーティションを取り除いて、INSERT INTOでデータのコピー先のテーブルを作成します。

CREATE TABLE iceberg_table (
  id int,
  data string,
  category string) 
LOCATION 's3://<mybucket>/iceberg_table/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'compaction_bin_pack_target_file_size_bytes'='536870912' 
)

以下のINSERT INTOでデータをコピーしました。7.527秒なのでレコード件数ではなく、INSERT INTOの実行に6秒ほどのオーバーヘッドが生じているのかもしれません。

INSERT INTO iceberg_table 
SELECT "c_custkey", "c_name", "c_address" FROM ssbdb.customer;

オーバーヘッド回避のワークアラウンド

上記のiceberg_tableは、SELECTするのに23秒かかりました。このオーバヘッドを回避するのなら、下記のように普通のAthenaのテーブル(iceberg_table_mart)に出力したテーブルをクエリすることで8秒にレスポンスを短縮できました。ファクトテーブルは、UPSERT可能なicebergテーブルに保存して、集計したデータマートを普通のテーブルとして使い分けることでそれぞれ長所を活かすことができそうです。

CREATE TABLE iceberg_table_mart AS SELECT * FROM iceberg_table;

最後に

これまでAWSは、Hudiのサポートが充実していたのでIcebergが先行したのは想定外でした。PrestoはIcebergもサポートしていたので、Athenaもサポートになるのは正常進化だったのかもしれません。

従来のデータレイクでUPSERTは、技術的に可能であっても運用としては適していませんでしたが、ファクトに相当するデータをIcebergに格納して、洗い替えのディメンジョンや頻繁にアクセスするデータマートを通常のテーブルとして作成することで、強みを活かしたテーブル構成が可能になるのではないかと考えられます。今後、GAになったタイミングで大規模なファクトをUPSERTできるか検証してみたいと考えています。

参考文献

合わせて読みたい