SnowflakeのWarehouse一覧をPythonで取得してみた

2020.04.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

Snowflakeでクエリを実行する際には、利用するWarehouseを指定する必要がありますが、そもそもこの「Warehouseの一覧ってどうやって取得したら良いの?」というのが気になったので試してみました。

前提

環境などは、以前Pythonで外部SQLファイルを実行をした際の環境を流用しています。今回は実行するSQLファイルを変更することで検証します。

$ python --version
Python 3.8.2

$ pip --version
pip 20.0.2 from /home/ubuntu/.anyenv/envs/pyenv/versions/3.8.2/lib/python3.8/site-packages/pip (python 3.8)

pyenvpipenvを利用しており、Pipfile は以下の通りです。

$ cat Pipfile
[source]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
snowflake-connector-python = "*"

[requires]
python_version = "3.8"

SQLファイルの作成

実行するSQLファイルを準備します。こちらが、今回の一番のメインです。

sql/show_warehouses.sql

SHOW WAREHOUSES;

このコマンドを実行すると、現在の実行ユーザが権限を持つWarehouseの一覧が取得できます。また、このコマンドの一番の利点としてはSHOWコマンドなので、SELECTと違い実行にWarehouseを必要としないことが利点に挙げられるかと思います。

また、制約としては取得できるレコードが最大1万レコードまでとなりますが、通常は問題にはならなそうです。

2020年04月の現在では、取得できるカラムはドキュメントに記載の通り29カラムと多いのですが、以下が主に利用する情報かと思います。

カラム名 概要
name Warehouse名
state Warehouseがアクティブ/実行中(STARTED)なのか非アクティブ(SUSPENDED)なのか
size Warehouseのサイズ

Pythonコードと.envファイルの作成

次に、Pythonコードと.envファイルを作成します。

こちらは前回利用したものと、ほぼ同じです。実行するSQLファイルの指定だけ変えています。

run-query-with-sql-file.py

import os
import snowflake.connector


def get_connection():
    con = snowflake.connector.connect(
        user = os.environ['SNOWFLAKE_USER'],
        password = os.environ['SNOWFLAKE_PASSWORD'],
        account = os.environ['SNOWFLAKE_ACCOUNT'],
        warehouse = os.environ['SNOWFLAKE_WAREHOUSE'],
        database = os.environ['SNOWFLAKE_DATABASE'],
        schema = os.environ['SNOWFLAKE_SCHEMA']
    )
    return con


def get_query(query_file_path):
    with open(query_file_path, 'r', encoding='utf-8') as f:
        query = f.read()
    return query


def run_query(query_file_path):
    with get_connection() as connection:
        with connection.cursor() as cur:
            query = get_query(query_file_path)
            cur.execute(query)
            rows = cur.fetchall()
            for row in rows:
                print(row)


if __name__ == "__main__":
    query_file_path = 'sql/show_warehouses.sql'
    run_query(query_file_path)

最後に、pipenvで読み込みされるように.envファイルに環境変数を設定しておきます。こちらも前回と同様です。もちろんSNOWFLAKE_WAREHOUSESNOWFLAKE_DATABASESNOWFLAKE_SCHEMAは不要なのですが、一旦そのままとしています。

.env

SNOWFLAKE_USER=xxxxx
SNOWFLAKE_PASSWORD=xxxxx
SNOWFLAKE_ACCOUNT=xxxxx
SNOWFLAKE_WAREHOUSE=xxxxx
SNOWFLAKE_DATABASE=SNOWFLAKE_SAMPLE_DATA
SNOWFLAKE_SCHEMA=TPCH_SF1

実行してみる

では、pipenv runで実行してみます。なお、前回はpipenv shellで試しましたが、今回はpipenv runで試してみました。

$ $ pipenv run python run-query-with-sql-file.py 
Loading .env environment variables…
('DEMO_WH', 'SUSPENDED', 'STANDARD', 'X-Large', 1, 1, 0, 0, 0, 'N', 'N', 600, 'true', '', '', '', '', datetime.datetime(2020, 1, 13, 7, 44, 35, 780000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), datetime.datetime(2020, 4, 6, 2, 18, 35, 794000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), datetime.datetime(2020, 4, 6, 2, 18, 35, 794000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'SYSADMIN', 'standard-server warehouse for ETL and most analytics', 'null', 0, 0, 0, 16, '46088', 'STANDARD')
('LOAD_WH', 'SUSPENDED', 'STANDARD', 'X-Small', 1, 1, 0, 0, 0, 'N', 'Y', 600, 'true', '', '', '', '', datetime.datetime(2020, 1, 13, 7, 44, 35, 395000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), datetime.datetime(2020, 4, 7, 2, 27, 29, 911000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), datetime.datetime(2020, 4, 7, 2, 27, 29, 911000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'SYSADMIN', 'standard-server warehouse for loading data', 'null', 0, 0, 0, 1, '46084', 'STANDARD')

ちょっとカラムが多くて見づらいですが、Warehouse一覧が取得できましたね!実際利用する際には、ウェブインターフェースでの選択時に出ている情報と同じく、namesizestateあたりの情報があれば十分そうです。

ちょっと改善してみる

GitHubのコードで、connection.pycursor.pyを見ると結果をdictとして取得することもできそうなので、ちょっとPythonコードを変えて実行したいと思います。

run-query-with-sql-file.py

import os
import snowflake.connector
from snowflake.connector import DictCursor

def get_connection():
    con = snowflake.connector.connect(
        user = os.environ['SNOWFLAKE_USER'],
        password = os.environ['SNOWFLAKE_PASSWORD'],
        account = os.environ['SNOWFLAKE_ACCOUNT'],
        warehouse = os.environ['SNOWFLAKE_WAREHOUSE'],
        database = os.environ['SNOWFLAKE_DATABASE'],
        schema = os.environ['SNOWFLAKE_SCHEMA']
    )
    return con


def get_query(query_file_path):
    with open(query_file_path, 'r', encoding='utf-8') as f:
        query = f.read()
    return query


def get_query_results(query_file_path):
    with get_connection() as connection:
        with connection.cursor(cursor_class=DictCursor) as cur:
            query = get_query(query_file_path)
            cur.execute(query)
            rows = cur.fetchall()
            return rows


def show_query_results(rows, columns):
    for row in rows:
        l = [row.get(column) for column in columns]
        print(l)


if __name__ == "__main__":
    query_file_path = 'sql/show_warehouses.sql'
    rows = get_query_results(query_file_path)
    columns = ['name', 'state', 'size']
    show_query_results(rows, columns)

今度はカーソルにカーソルクラスDictCursorを指定しています。これで、クエリ結果がdictで得られるようになります。その上で、指定カラムをリストとして表示するようにshow_query_results関数を追加してみました。

結果は以下の通りです。

$ pipenv run python run-query-with-sql-file.py 
Loading .env environment variables…
['DEMO_WH', 'SUSPENDED', 'X-Large']
['LOAD_WH', 'SUSPENDED', 'X-Small']

スッキリしましたね。

まとめ

以上、SnowflakeのWarehouse一覧の取得方法でした。これでSQL実行前にこの一覧からWarehouseを選ぶことが出来そうですね。

どなたかのお役に立てば幸いです。それでは!