S3 Tables 向けの Iceberg REST Catalog API が追加されたので、PyIcebergからアクセスしてみた。

S3 Tables 向けの Iceberg REST Catalog API が追加されたので、PyIcebergからアクセスしてみた。

Clock Icon2025.03.21

データ事業本部の笠原です。

S3 Tables向けのIceberg REST Catalog APIが追加されました。

以前こちらの記事ではGlueのIceberg REST endpoint経由でS3 TablesのIcebergテーブルにアクセスしていました。

今回は、その記事をベースに新たなIceberg REST Catalog APIのendpointにアクセスしたいと思います。

概要

今回追加されたエンドポイントは以下になります。

https://s3tables.<AWS_REGION>.amazonaws.com/iceberg

S3 Tablesの準備

まずはS3 Tablesのテーブルバケット・ネームスペース・テーブルを作成します。
今回も、東京リージョンでテーブルバケットを作成します。
「AWS 分析サービスとの統合」を有効化していない場合は、マネジメントコンソールから「統合を有効にする」をクリックして有効化しておきましょう。

テーブルバケットはマネジメントコンソール上で作っても構いませんが、今回はAWS CLIで作成します。

aws s3tables create-table-bucket \
  --name pyiceberg-sample-bucket

ネームスペースとテーブルについては、以前はAWS CLIで作成しました。
今回はAthena経由でS3 Tablesのマネジメントコンソールから作成できるようになったので、せっかくなのでこれを試しましょう。

テーブルバケット作成後、テーブルバケット名をクリックするとテーブル一覧が表示されます。まだ何も作ってないので一覧内は空です。「Athenaでテーブルを作成」をクリックしましょう。

s3tables_table_list

テーブル作成の前にネームスペースを先に作成します。「名前空間を作成」が選択されていることを確認して、ネームスペース名を入力します。ここでは main としています。その後、「名前空間を作成」をクリックします。

s3tables_create_namespace1

実際にネームスペースが正常に作成できたら、「Athenaでテーブルを作成」をクリックします。

s3tables_create_namespace2

Athenaの画面が表示されます。 CREATE TABLE クエリのサンプルも表示されています。

s3tables_create_table1

CREATE TABLE クエリは以下のように変更して、実行しましょう。
パーティション設定もおまけでやってみます。

CREATE TABLE main.temperature_humidity_histories (
sensor_id int, 
created_at timestamp, 
temperature double, 
humidity double) 
PARTITIONED BY (month(created_at)) 
TBLPROPERTIES ('table_type' = 'iceberg') 

s3tables_create_table2

これで、S3 Tablesのテーブルバケットにネームスペースとテーブルが作成されました。

権限設定

IAMロールやLake Formationで権限設定を行います。

IAMロール

Lambda関数にアタッチするロールのポリシーは、 AWSLambdaBasicExecutionRole に加えて、以下のポリシーを設定します。今回は s3tables のアクションと該当テーブルバケットリソースへの権限が必要です。

PyIcebergFunctionRolePolicy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3tables:GetTableBucket",
                "s3tables:ListTableBuckets",
                "s3tables:GetNamespace",
                "s3tables:ListNamespace",
                "s3Tables:GetTable",
                "s3Tables:ListTables",
                "s3Tables:GetTableData",
                "s3Tables:PutTableData",
                "s3Tables:GetTableMetadataLocation",
                "s3Tables:UpdateTableMetadataLocation",
            ],
            "Resource": [
                "arn:aws:s3tables:ap-northeast-1:<AWS_ACCOUNT_ID>:bucket/pyiceberg-sample-bucket",
                "arn:aws:s3tables:ap-northeast-1:<AWS_ACCOUNT_ID>:bucket/pyiceberg-sample-bucket/*",
            ],
            "Effect": "Allow"
        },
        {
            "Action": "lakeformation:GetDataAccess",
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Lake Formation

上記のIAMロールのS3Tablesへのアクセス権限を付与します。

## データベースへの権限付与
## AWS_PRINCIPAL_ARN は、Lambda関数にアタッチするIAMロールのARN
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Database\": {\"CatalogId\": \"${AWS_ACCOUNT_ID}:s3tablescatalog/${TABLE_BUCKET_NAME}\", \"Name\": \"main\" }}" \
  --permissions "ALL"
## テーブルへの権限付与
aws lakeformation grant-permissions \
  --catalog-id "${AWS_ACCOUNT_ID}" \
  --principal "{\"DataLakePrincipalIdentifier\": \"${AWS_PRINCIPAL_ARN}\"}" \
  --resource "{\"Table\": {\"CatalogId\": \"${AWS_ACCOUNT_ID}:s3tablescatalog/${TABLE_BUCKET_NAME}\", \"DatabaseName\": \"main\", \"TableWildcard\": {} }}" \
  --permissions "ALL"

また、「Application integration settings」の「Allow external engines to access data in Amazon S3 Locations with full table access」については、今回チェック入れなくてもアクセスできることを確認しています。

Glue Iceberg REST endpointへのアクセスでは、チェックを入れないと、PyIcebergの load_table() 実行時に 403 AccessDenied でアクセス拒否されてしまいますが、S3 Tables Iceberg REST endpointへのアクセスでは、チェック入れなくてもアクセスには問題ないようです。

lakeformation_application_integration_false

Lambda関数

pyicebergをLambda Layerに入れて、以下のようなLambda関数を用意します。

lambda_function.py
import os
import random
import datetime
import pyarrow as pa
from pyiceberg.catalog import load_catalog

def init_rest_catalog(s3table_bucket_name: str, account_id: str, region: str, catalog_name: str = "S3TablesCatalog"):
    return load_catalog(
        catalog_name,
        **{
            "type": "rest",
            "warehouse": f"arn:aws:s3tables:{region}:{account_id}:bucket/{s3table_bucket_name}",
            "uri": f"https://s3tables.{region}.amazonaws.com/iceberg",
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "s3tables",
            "rest.signing-region": region,
        },
    )

def handler(event, context):
    s3table_bucket_name = os.environ.get("S3TABLE_BUCKET_NAME")
    account_id = os.environ.get("ACCOUNT_ID")
    region = os.environ.get("REGION")
    database_name = os.environ.get("DATABASE_NAME")
    table_name = os.environ.get("TABLE_NAME")

    JST = datetime.timezone(datetime.timedelta(hours=+9), 'JST')

    ## REST Catalog
    rest_catalog = init_rest_catalog(s3table_bucket_name=s3table_bucket_name, account_id=account_id, region=region)

    ## Load Table
    table = rest_catalog.load_table(f"{database_name}.{table_name}")

    ## Insert Data
    schema = pa.schema([
        pa.field('sensor_id', pa.int32(), nullable=False),
        pa.field('created_at', pa.timestamp('us')),
        pa.field('temperature', pa.float64()),
        pa.field('humidity', pa.float64()),
    ])
    sample_data = {
        "sensor_id": random.choice(range(1, 11)),
        "created_at": datetime.datetime.now(JST),
        "temperature": random.normalvariate(20.0, 2),
        "humidity": random.normalvariate(40.0, 10),
    }
    df = pa.Table.from_pylist([sample_data], schema=schema)
    table.append(df)

    return {
        "status_code": 200,
    }

「S3 Tables Iceberg REST Endpoint」を使ったカタログの読み込みを以下で行っています。

def init_rest_catalog(s3table_bucket_name: str, account_id: str, region: str, catalog_name: str = "S3TablesCatalog"):
    return load_catalog(
        catalog_name,
        **{
            "type": "rest",
            "warehouse": f"arn:aws:s3tables:{region}:{account_id}:bucket/{s3table_bucket_name}",
            "uri": f"https://s3tables.{region}.amazonaws.com/iceberg",
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "s3tables",
            "rest.signing-region": region,
        },
    )

ちなみに、「Glue Iceberg REST Endpoint」を使った場合は以下の通りでした。

def init_rest_catalog(s3table_bucket_name: str, account_id: str, region: str, catalog_name: str = "S3TablesCatalog"):
    return load_catalog(
        catalog_name,
        **{
            "type": "rest",
            "warehouse": f"{account_id}:s3tablescatalog/{s3table_bucket_name}",
            "uri": f"https://glue.{region}.amazonaws.com/iceberg",
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "glue",
            "rest.signing-region": region,
        },
    )

warehouseurirest.signing-name が異なっているだけで、他は変わりません。
warehouse は、Glueの場合はカタログIDを指定しますが、S3 Tablesの場合はテーブルバケットARNを指定します。

確認

データが格納されたかどうかは、Athenaで確認してみます。
Lambda関数を実行した回数分、データが登録されていることが確認できました。

select *
from temperature_humidity_histories
;

athena_result

まとめ

いかがでしたでしょうか。

S3 TablesのIceberg REST Catalog APIのendpointを経由してアクセスすることで、単一のS3テーブルバケットへのアクセスが必要な場合は、今後サードパーティからのアクセスが容易になることが予想されます。

ちょっとしたデータ加工を行う際の選択肢の一つとして検討してみてはいかがでしょうか。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.