Amazon Athena Apache IcebergテーブルフォーマットによるACID Transactionを試してみました! #reinvent
データアナリティクス事業本部コンサルティングチームの石川です。Apache Icebergテーブルフォーマットを用いて、AthenaからACIDにINSERT、UPDATE、DELETE、タイムトラベル、オプティマイズの操作を可能にする新機能のパプリックプレビューが開始しました。Amazon S3データに対して信頼性の高い行レベルの変更が可能になりますので、実際に試してみます。
AthenaのApache Iceberg機能はプレビューリリースであり、変更される可能性があります。潜在的なデータの損失または破損を回避するために、本番データセットでこのプレビューを使用しないでください。
Apache Icebergとは
今回は、Amazon AthenaがサポートしたApache Icebergというテーブルフォーマットは、巨大なペタバイト規模のテーブル用に設計されたオープンなテーブルフォーマットです。テーブルフォーマットは、テーブルを構成するすべてのファイルをどのように管理、整理、追跡するかを決定することです。この中には、物理データファイル(ParquetやAvroなど)やテーブルの管理情報が含まれます。
IcebergのACID Transactionを試す!
AWS Lake Formationを有効にしていない状態の米国東部 (バージニア北部)で、下記の動作を確認しました。
データレイク用のS3バケット・フォルダの準備
一般的なデータレイクと同様にバケットを用意します。今回はバケットの下にiceberg_table
というフォルダを作成しました。
プレビュー中の機能を有効するWorkspaceを作成
プレビュー中の機能を有効にするには、AmazonAthenaIcebergPreview
という名前のWorkspaceを使用してクエリを実行する必要があります。おまじないのようなものだと思ってください。下記の以外の項目はデフォルトのままで構いません。
Query Editorを開き、WorkgroupからAmazonAthenaIcebergPreview
を選択します。
準備ができたので、動作を確認します。
テーブルの作成
table_typeにはICEBERG
、formatはparquet
を指定します。なお、2つ目のパーティションキーに指定したbucket(16,id)
は、id_bucket
というパーティションキーに変換されます。
CREATE TABLE iceberg_table ( id int, data string, category string) PARTITIONED BY (category, bucket(16,id)) LOCATION 's3://<mybucket>/iceberg_table/' TBLPROPERTIES ( 'table_type'='ICEBERG', 'format'='parquet', 'compaction_bin_pack_target_file_size_bytes'='536870912' )
テーブルを作成した直後は、S3のフォルダにメタデータのみが登録されています。
% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive 2021-12-06 21:30:30 0 iceberg_table/ 2021-12-07 13:40:55 1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json
レコードのINSERT
以下の2行を順番にINSERTします。
INSERT INTO sampledb.iceberg_table (id, data, category) values(1, 'mac-intel', 'macbook'); INSERT INTO sampledb.iceberg_table (id, data, category) values(2, 'mac-m1', 'macbook');
一行目をINSERTするのに6.225秒かかりました。2行目も概ね同じくらい時間がかかりました。
レコードのSELECTとタイムトラベル
INSERTしたレコードを確認します。
SELECT * from sampledb.iceberg_table;
タイムトラベル(Oracleのフラッシュバッククエリのようなもの)という機能を使って、過去に遡ってクエリすることもできます。「一日前」に指定すると先程作ったばかりのテーブルなので、0レコードとなります。
SELECT * FROM sampledb.iceberg_table FOR SYSTEM_TIME AS OF (current_timestamp - interval '1' day)
ここでデータがどのような状態で格納されているのか確認したいと思います。タイムスタンプから、データをINSERTするたびに2つのAVROファイル、1つのParquetファイル、1つのjsonファイルが作成されていることが確認できます。
% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive 2021-12-06 21:30:30 0 iceberg_table/ 2021-12-07 13:42:38 453 iceberg_table/data/3494c159//iceberg_table/category=macbook/id_bucket=4/82f67f06-a0e1-4522-ba48-3cbe6d3a8be6.parquet 2021-12-07 13:45:35 438 iceberg_table/data/4b0be337//iceberg_table/category=macbook/id_bucket=4/dde0a679-84f8-46f4-891e-33355a25f396.parquet 2021-12-07 13:40:55 1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json 2021-12-07 13:42:39 2336 iceberg_table/metadata/00001-b6262f06-8805-40c6-9b28-7b1d361f3705.metadata.json 2021-12-07 13:45:36 3289 iceberg_table/metadata/00002-79008873-1a1b-44db-830a-cdeae6c87dde.metadata.json 2021-12-07 13:42:38 7033 iceberg_table/metadata/958b8acb-2800-473b-b55e-486333344762-m0.avro 2021-12-07 13:45:35 7033 iceberg_table/metadata/b8518cfe-ffeb-4b58-880b-81ffe1156b85-m0.avro 2021-12-07 13:42:39 4270 iceberg_table/metadata/snap-1296825016097872104-1-958b8acb-2800-473b-b55e-486333344762.avro 2021-12-07 13:45:35 4340 iceberg_table/metadata/snap-6943776344382491461-1-b8518cfe-ffeb-4b58-880b-81ffe1156b85.avro
レコードのUPDATE
idが2
のレコードを更新します。
UPDATE sampledb.iceberg_table SET data='mac-m1-max' WHERE id = 2;
正常に更新されました。レコードを確認します。
SELECT * from sampledb.iceberg_table;
レコードのDELETE
idが1
のレコードを削除します。
DELETE FROM iceberg_table WHERE id=1;
正常に削除されました。レコードを確認します。
SELECT * from sampledb.iceberg_table;
もう一度、ここでデータがどのような状態で格納されているのか確認したいと思います。データが更新されるたびにファイルが作成されていることが確認できます。
% aws s3 ls s3://<mybucket>/iceberg_table/ --recursive 2021-12-06 21:30:30 0 iceberg_table/ 2021-12-07 14:00:57 943 iceberg_table/data/1e23f44b//iceberg_table/category=macbook/id_bucket=4/delete/position/c9a1ddd3-7b1b-4ca3-be6d-fc3218746c66.parquet 2021-12-07 14:00:58 458 iceberg_table/data/227a0b59//iceberg_table/category=macbook/id_bucket=4/821f0cd3-0fc4-4f7e-9e32-3c5b50c73d8f.parquet 2021-12-07 13:42:38 453 iceberg_table/data/3494c159//iceberg_table/category=macbook/id_bucket=4/82f67f06-a0e1-4522-ba48-3cbe6d3a8be6.parquet 2021-12-07 13:45:35 438 iceberg_table/data/4b0be337//iceberg_table/category=macbook/id_bucket=4/dde0a679-84f8-46f4-891e-33355a25f396.parquet 2021-12-07 14:11:44 941 iceberg_table/data/4d6d4e3a//iceberg_table/category=macbook/id_bucket=4/delete/position/8620f0ba-28b8-46e1-81b7-b0612fbdaf8c.parquet 2021-12-07 13:40:55 1417 iceberg_table/metadata/00000-60ff8e64-b2c6-4cc1-89b1-dc740816cf2d.metadata.json 2021-12-07 13:42:39 2336 iceberg_table/metadata/00001-b6262f06-8805-40c6-9b28-7b1d361f3705.metadata.json 2021-12-07 13:45:36 3289 iceberg_table/metadata/00002-79008873-1a1b-44db-830a-cdeae6c87dde.metadata.json 2021-12-07 14:00:58 4319 iceberg_table/metadata/00003-18926040-5269-48d2-b613-cb3286fe4ad9.metadata.json 2021-12-07 14:11:46 5287 iceberg_table/metadata/00004-8f8ca31a-df57-4292-9f07-fe18f5bba334.metadata.json 2021-12-07 14:11:45 7088 iceberg_table/metadata/5c2327c2-b66f-47bb-a586-4921410731e7-m0.avro 2021-12-07 13:42:38 7033 iceberg_table/metadata/958b8acb-2800-473b-b55e-486333344762-m0.avro 2021-12-07 13:45:35 7033 iceberg_table/metadata/b8518cfe-ffeb-4b58-880b-81ffe1156b85-m0.avro 2021-12-07 14:00:58 7036 iceberg_table/metadata/f53d1984-5b09-4c82-8c76-52c125467081-m0.avro 2021-12-07 14:00:58 7089 iceberg_table/metadata/f53d1984-5b09-4c82-8c76-52c125467081-m1.avro 2021-12-07 13:42:39 4270 iceberg_table/metadata/snap-1296825016097872104-1-958b8acb-2800-473b-b55e-486333344762.avro 2021-12-07 14:00:58 4408 iceberg_table/metadata/snap-3195787545865691157-1-f53d1984-5b09-4c82-8c76-52c125467081.avro 2021-12-07 14:11:45 4450 iceberg_table/metadata/snap-3742406407748705121-1-5c2327c2-b66f-47bb-a586-4921410731e7.avro 2021-12-07 13:45:35 4340 iceberg_table/metadata/snap-6943776344382491461-1-b8518cfe-ffeb-4b58-880b-81ffe1156b85.avro
テーブルのオプティマイズ
上記の通り、データを更新し続けるとファイルが作成されます。Icebergテーブルのパフォーマンスを最適化するために、手動圧縮をサポートしています。圧縮によってテーブルの内容を変更せずにテーブルのファイルレイアウトが最適化されます。
このREWRITE DATA
アクションは、述語を使用して、一致する行を含むファイルは最適化されます。圧縮操作の影響を受けるファイルの数を制御するために、WHERE
句を指定できます。
下記の例では、idが2
のレコードを含むファイルを最適化の対象に指定しています。
OPTIMIZE sampledb.iceberg_table REWRITE DATA USING BIN_PACK WHERE id=2;
テーブルのDROP
最後にテーブルの削除します。DROPのときはテーブル名をバッククオートで括らないとエラーになってしまいました。
DROP TABLE <code>iceberg_table</code>;
なお、他のデータレイクと同様に、テーブルをDROPしてもS3上のデータファイルが削除されません。
クエリの処理時間一覧
クエリ | 処理時間(秒) |
---|---|
テーブル作成 | 0.744 |
1レコードINSERT | 6.225 |
レコードのSELECT | 14.857 |
レコードのSELECT(タイムトラベル) | 12.966 |
レコードのUPDATE | 17.359 |
レコードのDELETE | 18.243 |
テーブルのオプティマイズ | 6.556 |
テーブルのDROP | 0.723 |
Icebergの強みを活かす
INSERT INTOで300万レコードのデータコピー
普通のAthenaのテーブルと比較して、オーバヘッドが生じることが確認できました。上記では、1レコードINSERTでも7秒かかったりしたので、300万レコードをINSERT INTOしたときの時間がどうなるのかを確認したいと思います。
上記のテーブルからパーティションを取り除いて、INSERT INTOでデータのコピー先のテーブルを作成します。
CREATE TABLE iceberg_table ( id int, data string, category string) LOCATION 's3://<mybucket>/iceberg_table/' TBLPROPERTIES ( 'table_type'='ICEBERG', 'format'='parquet', 'compaction_bin_pack_target_file_size_bytes'='536870912' )
以下のINSERT INTOでデータをコピーしました。7.527秒なのでレコード件数ではなく、INSERT INTOの実行に6秒ほどのオーバーヘッドが生じているのかもしれません。
INSERT INTO iceberg_table SELECT "c_custkey", "c_name", "c_address" FROM ssbdb.customer;
オーバーヘッド回避のワークアラウンド
上記のiceberg_tableは、SELECTするのに23秒かかりました。このオーバヘッドを回避するのなら、下記のように普通のAthenaのテーブル(iceberg_table_mart)に出力したテーブルをクエリすることで8秒にレスポンスを短縮できました。ファクトテーブルは、UPSERT可能なicebergテーブルに保存して、集計したデータマートを普通のテーブルとして使い分けることでそれぞれ長所を活かすことができそうです。
CREATE TABLE iceberg_table_mart AS SELECT * FROM iceberg_table;
最後に
これまでAWSは、Hudiのサポートが充実していたのでIcebergが先行したのは想定外でした。PrestoはIcebergもサポートしていたので、Athenaもサポートになるのは正常進化だったのかもしれません。
従来のデータレイクでUPSERTは、技術的に可能であっても運用としては適していませんでしたが、ファクトに相当するデータをIcebergに格納して、洗い替えのディメンジョンや頻繁にアクセスするデータマートを通常のテーブルとして作成することで、強みを活かしたテーブル構成が可能になるのではないかと考えられます。今後、GAになったタイミングで大規模なファクトをUPSERTできるか検証してみたいと考えています。