SnowflakeのWarehouse一覧をPythonで取得してみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
Snowflakeでクエリを実行する際には、利用するWarehouseを指定する必要がありますが、そもそもこの「Warehouseの一覧ってどうやって取得したら良いの?」というのが気になったので試してみました。
前提
環境などは、以前Pythonで外部SQLファイルを実行をした際の環境を流用しています。今回は実行するSQLファイルを変更することで検証します。
$ python --version Python 3.8.2 $ pip --version pip 20.0.2 from /home/ubuntu/.anyenv/envs/pyenv/versions/3.8.2/lib/python3.8/site-packages/pip (python 3.8)
pyenv
とpipenv
を利用しており、Pipfile
は以下の通りです。
$ cat Pipfile [source] name = "pypi" url = "https://pypi.org/simple" verify_ssl = true [dev-packages] [packages] snowflake-connector-python = "*" [requires] python_version = "3.8"
SQLファイルの作成
実行するSQLファイルを準備します。こちらが、今回の一番のメインです。
SHOW WAREHOUSES;
このコマンドを実行すると、現在の実行ユーザが権限を持つWarehouseの一覧が取得できます。また、このコマンドの一番の利点としてはSHOW
コマンドなので、SELECT
と違い実行にWarehouseを必要としないことが利点に挙げられるかと思います。
また、制約としては取得できるレコードが最大1万レコードまでとなりますが、通常は問題にはならなそうです。
2020年04月の現在では、取得できるカラムはドキュメントに記載の通り29カラムと多いのですが、以下が主に利用する情報かと思います。
カラム名 | 概要 |
---|---|
name | Warehouse名 |
state | Warehouseがアクティブ/実行中(STARTED )なのか非アクティブ(SUSPENDED )なのか |
size | Warehouseのサイズ |
Pythonコードと.envファイルの作成
次に、Pythonコードと.env
ファイルを作成します。
こちらは前回利用したものと、ほぼ同じです。実行するSQLファイルの指定だけ変えています。
import os import snowflake.connector def get_connection(): con = snowflake.connector.connect( user = os.environ['SNOWFLAKE_USER'], password = os.environ['SNOWFLAKE_PASSWORD'], account = os.environ['SNOWFLAKE_ACCOUNT'], warehouse = os.environ['SNOWFLAKE_WAREHOUSE'], database = os.environ['SNOWFLAKE_DATABASE'], schema = os.environ['SNOWFLAKE_SCHEMA'] ) return con def get_query(query_file_path): with open(query_file_path, 'r', encoding='utf-8') as f: query = f.read() return query def run_query(query_file_path): with get_connection() as connection: with connection.cursor() as cur: query = get_query(query_file_path) cur.execute(query) rows = cur.fetchall() for row in rows: print(row) if __name__ == "__main__": query_file_path = 'sql/show_warehouses.sql' run_query(query_file_path)
最後に、pipenvで読み込みされるように.env
ファイルに環境変数を設定しておきます。こちらも前回と同様です。もちろんSNOWFLAKE_WAREHOUSE
、SNOWFLAKE_DATABASE
、SNOWFLAKE_SCHEMA
は不要なのですが、一旦そのままとしています。
SNOWFLAKE_USER=xxxxx SNOWFLAKE_PASSWORD=xxxxx SNOWFLAKE_ACCOUNT=xxxxx SNOWFLAKE_WAREHOUSE=xxxxx SNOWFLAKE_DATABASE=SNOWFLAKE_SAMPLE_DATA SNOWFLAKE_SCHEMA=TPCH_SF1
実行してみる
では、pipenv run
で実行してみます。なお、前回はpipenv shell
で試しましたが、今回はpipenv run
で試してみました。
$ $ pipenv run python run-query-with-sql-file.py Loading .env environment variables… ('DEMO_WH', 'SUSPENDED', 'STANDARD', 'X-Large', 1, 1, 0, 0, 0, 'N', 'N', 600, 'true', '', '', '', '', datetime.datetime(2020, 1, 13, 7, 44, 35, 780000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), datetime.datetime(2020, 4, 6, 2, 18, 35, 794000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), datetime.datetime(2020, 4, 6, 2, 18, 35, 794000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'SYSADMIN', 'standard-server warehouse for ETL and most analytics', 'null', 0, 0, 0, 16, '46088', 'STANDARD') ('LOAD_WH', 'SUSPENDED', 'STANDARD', 'X-Small', 1, 1, 0, 0, 0, 'N', 'Y', 600, 'true', '', '', '', '', datetime.datetime(2020, 1, 13, 7, 44, 35, 395000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), datetime.datetime(2020, 4, 7, 2, 27, 29, 911000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), datetime.datetime(2020, 4, 7, 2, 27, 29, 911000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), 'SYSADMIN', 'standard-server warehouse for loading data', 'null', 0, 0, 0, 1, '46084', 'STANDARD')
ちょっとカラムが多くて見づらいですが、Warehouse一覧が取得できましたね!実際利用する際には、ウェブインターフェースでの選択時に出ている情報と同じく、name
、size
、state
あたりの情報があれば十分そうです。
ちょっと改善してみる
GitHubのコードで、connection.pyやcursor.pyを見ると結果をdictとして取得することもできそうなので、ちょっとPythonコードを変えて実行したいと思います。
import os import snowflake.connector from snowflake.connector import DictCursor def get_connection(): con = snowflake.connector.connect( user = os.environ['SNOWFLAKE_USER'], password = os.environ['SNOWFLAKE_PASSWORD'], account = os.environ['SNOWFLAKE_ACCOUNT'], warehouse = os.environ['SNOWFLAKE_WAREHOUSE'], database = os.environ['SNOWFLAKE_DATABASE'], schema = os.environ['SNOWFLAKE_SCHEMA'] ) return con def get_query(query_file_path): with open(query_file_path, 'r', encoding='utf-8') as f: query = f.read() return query def get_query_results(query_file_path): with get_connection() as connection: with connection.cursor(cursor_class=DictCursor) as cur: query = get_query(query_file_path) cur.execute(query) rows = cur.fetchall() return rows def show_query_results(rows, columns): for row in rows: l = [row.get(column) for column in columns] print(l) if __name__ == "__main__": query_file_path = 'sql/show_warehouses.sql' rows = get_query_results(query_file_path) columns = ['name', 'state', 'size'] show_query_results(rows, columns)
今度はカーソルにカーソルクラスDictCursor
を指定しています。これで、クエリ結果がdictで得られるようになります。その上で、指定カラムをリストとして表示するようにshow_query_results
関数を追加してみました。
結果は以下の通りです。
$ pipenv run python run-query-with-sql-file.py Loading .env environment variables… ['DEMO_WH', 'SUSPENDED', 'X-Large'] ['LOAD_WH', 'SUSPENDED', 'X-Small']
スッキリしましたね。
まとめ
以上、SnowflakeのWarehouse一覧の取得方法でした。これでSQL実行前にこの一覧からWarehouseを選ぶことが出来そうですね。
どなたかのお役に立てば幸いです。それでは!