Glue ETL Job より Apache Iceberg を操作する

2024.03.13

こんにちは、川田です。今回は、Glue ETL job より、Apache Iceberg の Glue Data Catalog テーブルを操作する方法を確認してみます。

結論

操作する方法としては、Glue ETL Job の Job parameters に、下記2点のパラメーターを追加する必要があります。

Iceberg フレームワークの有効化

以降では、Spark SQL を利用して Iceberg を操作する手順を紹介しています。

環境

  • Glue Version 4.0
  • 利用言語は Python です

事前準備

まずは事前に検証用の環境を用意します。

S3 Bucket を作成

以下、S3 Bucket を 2 つ用意します。

$ aws s3 mb s3://raw-ap-northeast-1-cm-zunda-demo --region ap-northeast-1    # 取込ファイル配置用のバケット
$ aws s3 mb s3://stage-ap-northeast-1-cm-zunda-demo --region ap-northeast-1  # Iceberg用のバケット

テスト用の取込ファイルを配置

上記で作成した raw bucket 側に、取込用の CSV ファイルを配置しておきます。

$ cat <<EOF | aws s3 cp - s3://raw-ap-northeast-1-cm-zunda-demo/source/sample-1.csv
> id,user_name,prefecture,age
> 0001,AAA,Tokyo,20
> 0002,BBB,Osaka,30
> 0003,CCC,Tokyo,25
> 0004,DDD,Osaka,35
> 0005,EEE,Fukuoka,40
> EOF

Glue ETL Job 用の IAM Role を作成

Glue ETL Job で利用する IAM Role を作成します。

$ aws iam create-role --role-name demo-glue-etl-job-service-role --assume-role-policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "glue.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}'

AWSGlueServiceRoleAmazonS3FullAccess の Managed Policy を付与しておきます。

$ aws iam attach-role-policy --role-name demo-glue-etl-job-service-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole

$ aws iam attach-role-policy --role-name demo-glue-etl-job-service-role \
--policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

Glue Data Catalog に、Iceberg 用のテーブルを作成

Glue Data Catalog に demo という名前のデータベースを作成します。

$ aws glue create-database --region ap-northeast-1 --database-input Name=demo

作成したデータベースに、sample という名前の Iceberg 用テーブルを作成しておきます。

$ aws glue create-table --database-name demo --region ap-northeast-1 \
--open-table-format-input IcebergInput={MetadataOperation=CREATE} \
--table-input \
'{
    "Name":"sample",
    "StorageDescriptor": {
        "Columns":[
            {"Name":"id", "Type":"string"},
            {"Name":"user_name", "Type":"string"},
            {"Name":"prefecture", "Type":"string"},
            {"Name":"age", "Type":"int"}
        ],
        "Location": "s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/",
        "Compressed": false
    },
    "TableType": "EXTERNAL_TABLE"
}'

Iceberg テーブル向けの操作

準備が整ったため、Iceberg テーブル向けの操作を Glue ETL より行います。

Glue ETL Job を作成/実行(ファイル取込処理)

下記コードの Glue ETL Job を作成・実行します。事前作業で用意したファイルを、Iceberg テーブルに取り込む処理となります。

import sys
from awsglue.context import GlueContext, DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.transforms import ApplyMapping
from pyspark.context import SparkContext
from pyspark.sql import DataFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
gc = GlueContext(sc)
job = Job(gc)
job.init(args["JOB_NAME"], args)
logger = gc.get_logger()

dyf: DynamicFrame = gc.create_dynamic_frame.from_options(
    connection_type="s3",
    format_options={
        "withHeader": True,
        "separator": ",",
    },
    format="csv",
    connection_options={
        "paths": ["s3://raw-ap-northeast-1-cm-zunda-demo/source/"],
        "recurse": True
    },
    transformation_ctx="load_raw_bucket"
)

dyf: DynamicFrame = ApplyMapping.apply(
    frame=dyf,
    mappings=[
        ("id", "string", "id", "string"),
        ("user_name", "string", "user_name", "string"),
        ("prefecture", "string", "prefecture", "string"),
        ("age", "string", "age", "int")
    ],
    transformation_ctx="apply_mapping"
)

df: DataFrame = dyf.toDF()
gc.write_data_frame.from_catalog(
    frame=df,
    database="demo",
    table_name="sample"
)

job.commit()

ジョブ・パラメーターには、下記を設定します。

parameter value
Name job-sink-iceberg
IAM Role demo-glue-etl-job-service-role ※準備作業で作成したもの
Type Spark
Glue version Glue 4.0
Language Python 3
Worker type G 1X
Requested number of workers 2
Job bookmark Enable
Number of retries 0
Job timeout (minutes) 180
Continuous logging Enable logs in CloudWatch.
Use Glue data catalog as the Hive metastore Enable
Job parameters --datalake-formats と --conf を追加 ※下記参考

重要な点が、 Job parameters で --datalake-formats: iceberg--conf: xxxx の値を渡すことです。

--conf パラメーターの Value 値は、具体的には下記となります。spark.sql.catalog.glue_catalog.warehouse パラメーターで指定する S3 Bucket と Path の値は、Iceberg 向けに作成した Glue Catalog テーブルで指定した S3 Bucket と Path になります。

spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.warehouse=s3://<iceberg-bucket-name>/<iceberg-bucket-path>/
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO

Iceberg 向けの設定として特別な点は、この Job parameters を渡すという点だけになります。

Iceberg フレームワークの有効化

Athena で結果確認(ファイル取込処理)

テーブル内の結果を、Athena にて確認します。

ちゃんと取り込めています。

スキーマ情報とパーティション情報。

Glue ETL Job を作成/実行(Schema/Partition Evolution)

続いて、Iceberg テーブルに area という名前のカラムを ADD COLUMN して ADD PARTITION する Spark SQL のコードを実行します。

import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args["JOB_NAME"], args)
logger = gc.get_logger()

stmt_add_column = """
ALTER TABLE glue_catalog.demo.sample
    ADD COLUMNS (area string)
"""
spark.sql(stmt_add_column)

stmt_add_partition = """
ALTER TABLE glue_catalog.demo.sample
    ADD PARTITION FIELD area
"""
spark.sql(stmt_add_partition)

job.commit()

ジョブ・パラメーターの値は、job 名を除き上述ジョブと同じ値になるため省略します。

Athena で確認(Schema/Partition Evolution)

結果を確認します。

スキーマ情報とパーティション情報。ちゃんと更新されています。

実際のパーティション情報。area カラムに値のあるレコードがないため、当然全て null となっています。

Glue ETL Job を作成/実行(Insert overwrite)

パーティションの設定を反映させるため、INSERT OVERWRITE コマンドにて area カラムの値を更新します。

import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args["JOB_NAME"], args)
logger = gc.get_logger()

stmt = """
    insert overwrite glue_catalog.demo.sample
    select
        id, user_name, prefecture, age,
        case
            when prefecture = 'Tokyo' then 'Kanto'
            when prefecture = 'Osaka' then 'Kansai'
            when prefecture = 'Fukuoka' then 'Kyushu'
        end as area
    from glue_catalog.demo.sample
"""
spark.sql(stmt)

job.commit()

ジョブ・パラメーターの値は、job 名を除き上述ジョブと同じ値になるため省略します。

Athena で確認(Insert overwrite)

結果を確認します。

area カラムの値が更新されています。

パーティション化されました。

その他

投稿した内容では、Iceberg 用の config の設定を ETL ジョブのパラメーターとして渡しましたが、下記のようにコード内にコンフィグを記載する方法もあるようです。

from pyspark.context import SparkContext
from pyspark.conf import SparkConf

conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://<iceberg-bucket-name>/<iceberg-bucket-path>/")
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

sc = SparkContext(conf=conf)