Glue Data Catalog 上の Parquet テーブルを Apache Iceberg テーブルにマイグレーションする

2024.03.25

こんにちは、川田です。今回は、AWS Big Data Blog に記載されていた Migrate an existing data lake to a transactional data lake using Apache Iceberg について紹介いたします。

ブログ内では、Glue Data Catalog 上で Parquet ファイルで作成されている既存データレイクを、Apache Iceberg のデータレイクに変換する方法が紹介されています。紹介されている具体的な中身について、以下にて確認していきたいと思います。

マイグレーション方法

どのようにマイグレーションを行うのか、紹介されている手法についてまとめてみます。

マイグレーションする方法として In-place data upgradeCTAS migration of data の 2 種類の方法が紹介されています。

In-place data upgrade

In-place data upgrade の方法として、さらに add_filesmigrate の方法が紹介されています。

  • add_files
    • 既存の Parquet ファイルをそのまま活かす形で Iceberg 環境を作成する
    • Iceberg 用メタ情報ファイルのみが新規で作成されるイメージ
    • add_files とは、AWS の機能ではなく Spark 環境向け Iceberg Runtime モジュール内にあるプロシージャーの 1 つとなる

This command will create metadata for the new files and will not move them. This procedure will not analyze the schema of the files to determine if they actually match the schema of the Iceberg table. Upon completion, the Iceberg table will then treat these files as if they are part of the set of files owned by Iceberg.

このコマンドは新しいファイルのメタデータを作成し、ファイルを移動ません。この手順では、ファイルのスキーマを解析して、ファイルが実際に Iceberg テーブルのスキーマと一致するかどうかを判断しないです。完了すると、Iceberg テーブルはこれらのファイルを Iceberg が所有するファイル セットの一部であるかのように扱います。

  • migrate
    • 既存の Parquet ファイルを読み込んで、Iceberg 向けのデータとして再作成する
    • こちらも AWS の機能ではなく、Spark 環境向け Iceberg Runtime モジュール内にあるプロシージャーの 1 つとなる
    • しかしながら、こちらのプロシージャーは Glue Data Catalog ではサポートされていない、とのこと

add files を利用した場合の、Props/Cons が紹介されています。

Pros

・Avoids full table scans to read the data as there is no restatement. This can save time.

・If there are any errors during while writing the metadata, only a metadata re-write is required and not the entire data.

・Lineage of the existing jobs is maintained because the existing catalog still exists.

(再記述がないため、データを読み込むためのフルテーブルスキャンを回避できる。これは時間の節約になる)

(メタデータの書き込み中にエラーが発生した場合、メタデータの再書き込みのみが必要で、データ全体の再書き込みは必要ない)

(既存のカタログがまだ存在するため、既存のジョブの系統が維持される)

Cons

・If data is processed (inserts, updates, and deletes) in the dataset during the metadata writing process, the process must be run again to include the new data.

・There must be write downtime to avoid having to run the process a second time.

・If a data restatement is required, this workflow will not work as source data files aren’t modified.

(メタデータの書き込み処理中にデータセット内でデータの処理(挿入、更新、削除)が行われた場合、新しいデータを含めるために処理を再度実行しなければならない)

(2 回目のプロセス実行を避けるためには、書き込みのダウンタイムが必要となる)

(データの再記述が必要な場合、ソース・データ・ファイルは変更されないため、このワークフローは機能しない)

CTAS migration of data

CTAS migration of data の方法として、以下 1 種類が紹介されています。

  • ctas
    • create table as select の意
    • Spark SQL の処理にて、既存 Parquet のテーブルを select して、Iceberg のテーブルとして create する

こちらも Props/Cons が紹介されています。

Pros

・It allows you to audit and validate the data during the process because data is restated.

・If there are any runtime issues during the migration process, rollback and recovery can be easily performed by deleting the Apache Iceberg table.

・You can test different configurations when migrating a source. You can create a new table for each configuration and evaluate the impact.

・Shadow data is renamed to a different directory in the source (so it doesn’t collide with old Apache Parquet data).

(データが再作成されるため、プロセス中にデータの監査と検証ができる)

(移行プロセス中に実行時の問題が発生した場合、Apache Iceberg テーブルを削除することで、ロールバックとリカバリを簡単に実行できる)

(ソースの移行時にさまざまな構成をテスト可能。各構成ごとに新しいテーブルを作成し、影響を評価できる)

(シャドウ・データはソース内の別のディレクトリにリネームされる(古い Apache Parquet データと衝突を防ぐため))

Cons

・Storage of the dataset is doubled during the process as both the original Apache Parquet and new Apache Iceberg tables are present during the migration and testing phase. This needs to be considered during cost estimation.

・The migration can take much longer (depending on the volume of the data) because both data and metadata are written.

・It’s difficult to keep tables in sync if there changes to the source table during the process.

(移行とテストの段階で元の Apache Parquet テーブルと新しい Apache Iceberg テーブルの両方が存在するため、プロセス中にデータセットのストレージが 2 倍になる。これはコスト見積もりの際に考慮する必要がある)

(データとメタデータの両方を書き込むため、移行に時間がかかる(データ量に依存))

(移行作業中に移行元テーブルに変更があった場合、テーブルを同期させるのは難しい)


以降にて、実際にマイグレーション作業を実施してみたいと思います。

環境

  • Glue Version 4.0
  • 利用言語は Python です

事前準備

マイグレーション元となるデータを用意

マイグレーション元となるデータを配置する S3 バケットを作成します。

$ aws --profile s3 private mb s3://stage-ap-northeast-1-cm-zunda-demo --region ap-northeast-1

マイグレーション元となる Parquet ファイルを用意します。用意するファイルは NOAA Global Historical Climatology Network Daily (GHCN-D) のデータセットとなり、世界の気象情報を記録したデータとのことです。Registry of Open Data on AWS の S3 バケットより、上記で作成したバケットまで sync させます。

$ aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2024/ELEMENT=AWDR/ s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWDR/
$ aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2024/ELEMENT=AWND/ s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWND/
$ aws s3 sync s3://noaa-ghcn-pds/parquet/by_year/YEAR=2024/ELEMENT=DAPR/ s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=DAPR/

sync した結果です。yearELEMENT の値でパーティション化できるようなディレクトリ構造になっています。

$ aws s3 ls --human-readable --recursive s3://stage-ap-northeast-1-cm-zunda-demo/parquet/
2024-03-15 18:59:09    8.6 KiB parquet/year=2024/ELEMENT=AWDR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet
2024-03-15 18:59:25  143.3 KiB parquet/year=2024/ELEMENT=AWND/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet
2024-03-15 18:59:36  113.4 KiB parquet/year=2024/ELEMENT=DAPR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet

マイグレーション元データの、Glue Data Catalog テーブルを作成

続いて Glue クローラーを利用して、sync したデータの Glue Data Catalog テーブルを作成します。

まず、Glue Data Catalog に demo という名前のデータベースを作成します。

$ aws glue create-database --region ap-northeast-1 --database-input Name=demo

クローラーで利用する IAM ロールを作成します。

$ aws iam create-role --role-name demo-glue-crawler-service-role --assume-role-policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}'

作成した IAM ロールに、AWSGlueServiceRoleAmazonS3FullAccess のポリシーを付与しておきます。

$ aws iam attach-role-policy --role-name demo-glue-crawler-service-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole

$ aws iam attach-role-policy --role-name demo-glue-crawler-service-role \
--policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

クローラーを作成します。demo_source_parquet という名前にてテーブルを作成させます。

$ aws glue create-crawler --region ap-northeast-1 \
--name demo-crawler-source \
--role demo-glue-crawler-service-role \
--database demo \
--table-prefix demo_source_ \
--targets \
'{
    "S3Targets": [
        { "Path": "s3://stage-ap-northeast-1-cm-zunda-demo/parquet/" }
    ]
}'

クローラーを実行します。

$ aws glue start-crawler --region ap-northeast-1 --name demo-crawler-source

以下、テーブルが作成されました。

マイグレーション

In-place data upgrade (add_files) 処理を実行

それでは、実際に In-place data upgrade 方式の add_files プロシージャーを Glue Spark 上で実行してみたいと思います。

該当プロシージャーを実行するまでの手順を、Jupyter Notebook の ipynb ファイルにまとめてくれており、そのファイルが公開されています。

公開されている Jupyter Notebook のファイル

AWS Big Data Blog では、上述の Jupyter Notebook を Glue Studio 上で実行する手順が示されていますが、(AWS 利用コストの節約のため...) 今回はローカル PC 上に作成した Glue 開発環境で実行してみます。ローカル PC 上の Glue 開発環境については、下記の投稿で紹介されています。

Glue Docker Image を利用した Glue Job ローカル開発をためしてみた


今回は実行処理を PySpark のスクリプトに起こして実行させています。スクリプト自体は以下の gist にて公開しています。

iceberg_migration_add_files.py

それでは、具体的な処理コードとその結果ログを確認していきます。

まず、Spark セッションを作成。

from pyspark.sql import SparkSession

catalog_name = "glue_catalog"
bucket_name = "stage-ap-northeast-1-cm-zunda-demo"
warehouse_path = f"s3://{bucket_name}/iceberg"

spark = SparkSession.builder \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

spark.sparkContext.setLogLevel('OFF')

最初に、空の Iceberg テーブルを作成します。demo_target_iceberg_add_files という名前で作成しています。

# Create an empty Iceberg table
query = f"""
create table if not exists {catalog_name}.demo.`demo_target_iceberg_add_files`
using iceberg
tblproperties ("format-version"="2")
partitioned by (year, element)
as
    select * from demo.`demo_source_parquet` limit 0
"""
spark.sql(query)

続いて確認用のクエリを実行していきます。

データベース内のテーブル一覧を表示。作成されています。

spark.sql("use demo")
spark.sql("show tables").show(truncate=False)
+---------+-----------------------------+-----------+
|namespace|tableName                    |isTemporary|
+---------+-----------------------------+-----------+
|demo     |demo_source_parquet          |false      |
|demo     |demo_target_iceberg_add_files|false      |
+---------+-----------------------------+-----------+

作成した Iceberg テーブルのメタ情報を確認してみます。データファイルの存在を確認。空のテーブルのため、データファイルは存在していません。

query_select_files = f"select file_path from {catalog_name}.demo.demo_target_iceberg_add_files.files"
spark.sql(query_select_files).show(10, truncate=False)
+---------+
|file_path|
+---------+
+---------+

同様にメタ情報から、次はマニフェストファイルの有無を確認してみます。

query_select_snapshots = f"select snapshot_id, manifest_list from {catalog_name}.demo.demo_target_iceberg_add_files.snapshots"
spark.sql(query_select_snapshots).show(10, truncate=False)

以下、マニフェストファイルは作成されています。

注目したい点が、マニフェストファイルは s3://<bucket-name>/iceberg に作成されている点です。このパスは Spark セッション作成時に指定した Iceberg テーブル向けのパスとなり、Parquet のファイルを配置しているパス s3://<bucket-name>/parquet/ とは異なる点です。

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |manifest_list                                                                                                                                                      |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4759882607015209639|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_add_files/metadata/snap-4759882607015209639-1-e86c9f7f-46cb-4422-bf4f-64b256b771f6.avro|
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

ようやくですが、くだんの add_files プロシージャーを実行します。

# Run the add_files procedure
query = f"""
call {catalog_name}.system.add_files(
    table => 'demo.demo_target_iceberg_add_files',
    source_table => 'demo.demo_source_parquet'
)
"""
spark.sql(query).show(truncate=False)
+-----------------+                                                             
|added_files_count|
+-----------------+
|3                |
+-----------------+

先ほどと同様に Iceberg テーブルのメタ情報を確認してみます。

データファイルが追加されています。このデータファイルが準備作業で配置した Parquet ファイルとなっており、既存の Parquet ファイルがそのまま Iceberg テーブルとして登録されている事が分かります。

+------------------------------------------------------------------------------------------------------------------------+
|file_path                                                                                                               |
+------------------------------------------------------------------------------------------------------------------------+
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWDR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet|
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWND/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet|
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=DAPR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet|
+------------------------------------------------------------------------------------------------------------------------+

マニフェストファイルも更新されています。

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |manifest_list                                                                                                                                                      |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4759882607015209639|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_add_files/metadata/snap-4759882607015209639-1-e86c9f7f-46cb-4422-bf4f-64b256b771f6.avro|
|8518956183766463425|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_add_files/metadata/snap-8518956183766463425-1-a8bc31c2-fc42-4783-8280-6f379a0db162.avro|
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

試しに、この状態で Iceberg テーブルに 1 レコードを追記してみます。

# add one record
query = f"""
insert into {catalog_name}.demo.demo_target_iceberg_add_files
    select * from demo.demo_source_parquet limit 1
"""
spark.sql(query)

メタ情報を確認してみます。

新しいデータファイルが追加されました。また追加されたデータファイルのパスは、Iceberg テーブル向けのパスの方に作成されています。こんな動きになるのですね。

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|file_path                                                                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_add_files/data/year=2024/element=AWND/00000-2014-e3865750-b51f-4229-8bde-4ec92f975a16-00001.parquet|
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWDR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet                                                       |
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=AWND/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet                                                       |
|s3://stage-ap-northeast-1-cm-zunda-demo/parquet/year=2024/ELEMENT=DAPR/c3959f73af7f43d5999842b47f0e1772_0.snappy.parquet                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

マイグレーション結果の確認のため、Athena より source と target テーブルのレコード件数を確認してみます。

まずは source 側の demo_source_parquet テーブルのレコード件数を確認。95934 件です。

$ aws athena start-query-execution --region ap-northeast-1 \
--query-string "select count(*) as cnt from demo.demo_source_parquet;" \
--result-configuration OutputLocation=s3://athena-query-output-ap-northeast-1-xxxxx
{
    "QueryExecutionId": "5f2768fb-1451-4318-8547-f29785ec89c7"
}

$ aws athena get-query-results --region ap-northeast-1 --query-execution-id 5f2768fb-1451-4318-8547-f29785ec89c7 \
| jq -r '.ResultSet.Rows[].Data[].VarCharValue'
cnt
95934

続いて、target 側である demo_target_iceberg_add_files テーブルのレコード件数を確認。95935 件です。Source と比較すると 1 件多いですが、これは 1 レコード追記処理をしたからですね。マイグレーションできているようです。

$ aws athena start-query-execution --region ap-northeast-1 \
--query-string "select count(*) as cnt from demo.demo_target_iceberg_add_files;" \
--result-configuration OutputLocation=s3://athena-query-output-ap-northeast-1-xxxxxx
{
    "QueryExecutionId": "c3163435-dff5-4896-a2d9-e3b357cc026f"
}

$ aws athena get-query-results --region ap-northeast-1 --query-execution-id c3163435-dff5-4896-a2d9-e3b357cc026f \
| jq -r '.ResultSet.Rows[].Data[].VarCharValue'
cnt
95935

CTAS migration of data 処理を実行

CTAS migration of data 処理でのマイグレーションも試してみます。と言っても、こちらは Spark SQL で create table as select するものなので、何となくイメージできますね。

実行した PySpark スクリプトは、以下 gist のものとなります。

iceberg_migration_ctas.py

Spark SQL で create table as select をします。source となる demo_source_parquet テーブルを select し、demo_target_iceberg_ctas という名前のテーブルを create しています。

# create table as select
query = f"""
create table {catalog_name}.demo.`demo_target_iceberg_ctas`
using iceberg
tblproperties ("format-version"="2")
partitioned by (year,element)
as
    select * from demo.`demo_source_parquet`
    order by year, element
"""
spark.sql(query)

こちらも同様に Iceberg のメタ情報を確認してみます。

データファイルは、Iceberg テーブル向けのパスの方に作成されています。新しくデータファイルが作成された訳ですね。

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|file_path                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_ctas/data/year=2024/element=AWDR/00000-6-47293939-e52c-48c8-b6c5-7b78d395e302-00001.parquet|
|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_ctas/data/year=2024/element=AWND/00000-6-47293939-e52c-48c8-b6c5-7b78d395e302-00002.parquet|
|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_ctas/data/year=2024/element=DAPR/00001-7-37b9396d-f4f8-44d0-97bc-b823b5fcfe96-00001.parquet|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |manifest_list                                                                                                                                                 |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|8408604770096223428|s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/demo.db/demo_target_iceberg_ctas/metadata/snap-8408604770096223428-1-21290179-a85a-41b3-8d45-cf2c4b9f1c3b.avro|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------+

マイグレーション結果の確認のため、Athena より source と target テーブルのレコード件数を確認してみます。

demo_source_parquet テーブルは、上記で確認して 95934 件とわかっているので省略し、target 側の demo_target_iceberg_ctas テーブルのレコード件数を確認します。

$ aws athena start-query-execution --region ap-northeast-1 \
--query-string "select count(*) as cnt from demo.demo_target_iceberg_ctas;" \
--result-configuration OutputLocation=s3://athena-query-output-ap-northeast-1-xxxxxx
{
    "QueryExecutionId": "b5361ee1-ccb2-4549-9819-dda43b31f4e2"
}

$ aws athena get-query-results --region ap-northeast-1 --query-execution-id b5361ee1-ccb2-4549-9819-dda43b31f4e2 \
| jq -r '.ResultSet.Rows[].Data[].VarCharValue'
cnt
95934

95934 件で一致しますね。マイグレーションされています。