【Databricks】Iceberg REST catalogを利用してPyIcebergでアクセスしてみた

【Databricks】Iceberg REST catalogを利用してPyIcebergでアクセスしてみた

Clock Icon2025.04.10

データ事業本部のueharaです。

今回は、DatabricksのIceberg REST catalogを利用してPyIcebergでテーブルにアクセスしてみたいと思います。

テーブル作成

DatabricksはDelta Lakeに保存されたテーブルのIcebergの読み取りをサポートしています。

https://docs.databricks.com/gcp/en/delta/uniform

テーブルに対してこの機能を有効化すると、Deltaテーブルにデータを書き込むために使用されるコンピューティング上でIcebergのメタデータの生成が非同期的に実行されます。

この機能を利用すれば、SnowflakeやPyIcebergからIcebergテーブルとして参照できます。

統合については以下のページをご確認下さい。

https://docs.databricks.com/aws/en/external-access/integrations

早速、以下のようにAmazon S3上に外部テーブルとしてDeltaテーブルを新規作成し、Iceberg読み取りを有効化してみます。

CREATE TABLE sample_table (
  id STRING,
  name STRING,
  score INT
)
USING DELTA
LOCATION 's3://uehara-databricks-test/sample-table/'
TBLPROPERTIES(
  'delta.columnMapping.mode' = 'name',
  'delta.enableIcebergCompatV2' = 'true',
  'delta.universalFormat.enabledFormats' = 'iceberg'
);

テーブルの作成ができたら、適当にデータを挿入してみます。

INSERT INTO sample_table VALUES
('1', 'Alice', 100),
('2', 'Bob', 90),
('3', 'Charlie', 80),
('4', 'Dave', 70),
('5', 'Eve', 60);

データの挿入ができたら、一旦テーブルを確認してみます。

0410_databricks_01

これで準備ができました。

S3やメタデータ生成ステータスの確認

ロケーションとして設定したS3のパスを確認すると _delta_log の他 metadata が作成されていることが分かります。

sample-table/
├── Kp/
│   └── part-00000-0486d4c1-8268-4858-a0b6-1055f519891f.c000.zstd.parquet
├── _delta_log/
│   ├── 00000000000000000000.crc
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.crc
│   ├── 00000000000000000001.json
│   └── _commits/
└── metadata/
    ├── 00000-e8a79cd8-9210-43c0-973b-90599285b946.metadata.json
    ├── 00001-e51afb2a-2b10-4a14-825f-280a487e2b65.metadata.json
    ├── 00002-c579d903-de56-4ba7-80c7-cb8e82e87e53.metadata.json
    ├── cb315dc5-b3c0-4656-aa2a-f13d579e0c81-m0.avro
    └── snap-4128383503645490189-1-cb315dc5-b3c0-4656-aa2a-f13d579e0c81.avro

冒頭でIcebergのメタデータは非同期で生成されると記載しましたが、メタデータの生成ステータスを追跡するために以下のメタデータフィールドを確認することができます。(参考

0410_databricks_02

DESCRIBE EXTENDED <table_name>; でこれらメタデータフィールドの情報を確認することができるので、実際に確認したところ以下のようになっていました。

0410_databricks_03

PyIcebergでアクセス

PyIcebergpip を利用して以下でインストールできます。

$ pip install pyiceberg

また、接続のためのURIとTokenを環境変数に設定しておきます。

$ export UC_ICEBERG_REST_URI=<workspace-url>/api/2.1/unity-catalog/iceberg

$ export DATABRICKS_TOKEN=<your-token>

<workspace-url> には、ご自身のDatabricks workspaceのurlを、 <token> にはアクセスのためのトークンを指定してください。

上記を踏まえた上で、以下のPythonプログラムを作成します。

access_with_iceberg.py
import os

from pyiceberg.catalog import load_catalog

# エンドポイントのURI
UC_ICEBERG_REST_URI = os.environ.get("UC_ICEBERG_REST_URI")
# Databricksパーソナルアクセストークン
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN")
# Unity Catalogのカタログ名
UC_CATALOG_NAME = "workspace"

# Unity CatalogのRESTカタログプロパティ
properties = {
    "type": "rest", 
    "uri": UC_ICEBERG_REST_URI,
    "token": DATABRICKS_TOKEN,
    "warehouse": UC_CATALOG_NAME,
}

try:
    catalog = load_catalog(UC_CATALOG_NAME, **properties)

    # テーブルのロード
    table_name = "default.sample_table"
    print(f"Loading Iceberg table: {table_name}")
    table = catalog.load_table(table_name)
    print("Table loaded successfully.")

    # データのスキャンとPandas DataFrameへのロード
    print("Data loaded into Pandas DataFrame:")
    df = table.scan().to_pandas()

    print(df.to_string(index=False))

except Exception as e:
    print(f"An error occurred: {e}")

UC_CATALOG_NAME に設定しているカタログ名はご自身の環境に合わせて下さい。

なお、外部テーブル(クラウドのオブジェクトストレージ内にデータを保存)において、外部システムからデータファイルにアクセスする場合、Unity Catalog権限は適用されません。

つまり、S3へのデータアクセス自体については別途アクセスキー等の設定が必要になります。

今回はAWS STSを利用し一時認証情報を取得して環境変数に設定し、作成したPythonスクリプトを実行するBashスクリプトを用意しました。(IAMロールには自己信頼ポリシーを追加しておく必要があります)

get_secret_and_execute_py.sh
#!/usr/bin/env bash
set -euo pipefail

PROFILE=$1

unset AWS_PROFILE
export AWS_SDK_LOAD_CONFIG=true

ROLE_ARN=$(aws --profile ${PROFILE} configure get role_arn)
STS_RESULT=$(aws --profile ${PROFILE} sts assume-role --role-arn ${ROLE_ARN} --role-session-name sls-deploy-session --duration-seconds 3600)

export AWS_ACCESS_KEY_ID=$(echo ${STS_RESULT} | jq -r '.Credentials.AccessKeyId')
export AWS_SECRET_ACCESS_KEY=$(echo ${STS_RESULT} | jq -r '.Credentials.SecretAccessKey')
export AWS_SESSION_TOKEN=$(echo ${STS_RESULT} | jq -r '.Credentials.SessionToken')
export AWS_REGION='ap-northeast-1'

python access_with_iceberg.py

実行コマンドは次のようになります。

$ bash get_secret_and_execute_py.sh <YOUR-AWS-PROFILE>

実行の結果、以下の通りテーブルデータが取得できたことを確認しました。

0410_databricks_04

最後に

今回は、DatabricksのIceberg REST catalogを利用してPyIcebergでテーブルにアクセスしてみました。

参考になりましたら幸いです。

参考文献

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.