Snowflake Connector for Pythonの非同期クエリを試してみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
SnowflakeをPythonから利用するための「Snowflake Connector for Python」のバージョン2.3.7から、非同期クエリのサポートが開始されました。
これまでは同期的なクエリの実行でしたが、非同期クエリがサポートされたことにより、クエリを非同期で発行して、後から結果の確認をすることができるようになります。
今回は実際にこの機能を試してみたいと思います。
前提
前提として、以下の環境で検証を行っています。
- WSL2 (Ubuntu 18.04.3 LTS)
- Python 3.8.6
- Snowflake Connector for Python 2.3.7
また、Snowflake Connector for Pythonについては以下を参考に導入済みとなります。
実行するクエリ
今回実行するクエリは以下になります。
SELECT count(*) FROM TABLE(generator(timeLimit => 5)) ;
GENERATOR関数を利用して、5秒間の間データレコードを作成します。その結果、作成されたレコード数をCOUNTで取得するクエリとなっています。
データの中身に特に意味はなく、単純に「5秒間」という期間でクエリを実行するためのクエリとなっています。
Pythonコード
次に実行するPythonコードです。pipenvを利用しており、環境変数については事前に.env
ファイルに定義しています。
SNOWFLAKE_USER=XXXXXXXXXX SNOWFLAKE_PASSWORD=XXXXXXXXXX SNOWFLAKE_ACCOUNT=XXXXXXXXXX.ap-northeast-1.aws SNOWFLAKE_WAREHOUSE=XXXXXXXXXX SNOWFLAKE_DATABASE=XXXXXXXXXX SNOWFLAKE_SCHEMA=XXXXXXXXXX SNOWFLAKE_ROLE=XXXXXXXXXX
import os import time import snowflake.connector from snowflake.connector import DictCursor def get_connection(): con = snowflake.connector.connect( user = os.environ['SNOWFLAKE_USER'], password = os.environ['SNOWFLAKE_PASSWORD'], account = os.environ['SNOWFLAKE_ACCOUNT'], role = os.environ['SNOWFLAKE_ROLE'], warehouse = os.environ['SNOWFLAKE_WAREHOUSE'], database = os.environ['SNOWFLAKE_DATABASE'], schema = os.environ['SNOWFLAKE_SCHEMA'] ) return con def get_query(query_file_path): with open(query_file_path, 'r', encoding='utf-8') as f: query = f.read() return query def exec_async_query(query_file_path): with get_connection() as con: with con.cursor(cursor_class=DictCursor) as cur: query = get_query(query_file_path) cur.execute_async(query) query_id = cur.sfqid print('async query executed.') while con.is_still_running(con.get_query_status_throw_if_error(query_id)): print('waiting query results...') time.sleep(1) cur.get_results_from_sfqid(query_id) print('recieved query results.') results = cur.fetchall() print('query result:') print(results) if __name__ == "__main__": query_file_path = 'sql/long-query.sql' exec_async_query(query_file_path)
クエリの実行はcur.execute_async(query)
とすることで非同期実行となっています。この実行後にasync query executed.
と表示して、非同期実行が終わったことを表示しています。
実行結果については、query_id = cur.sfqid
でクエリIDを取得し、con.is_still_running(con.get_query_status_throw_if_error(query_id))
を利用してクエリがまだ実行中かどうかをポーリングしてチェックしています。チェック中には1秒ごとにwaiting query results...
と表示します。
その後、クエリの結果をcur.get_results_from_sfqid(query_id)
で取得して、recieved query results.
と表示してからフェッチ結果を表示します。
実行してみる
では、実際に実行してみましょう。
$ python async-query-sample.py async query executed. waiting query results... waiting query results... waiting query results... waiting query results... waiting query results... recieved query results. query result: [{'COUNT(*)': 109586874368}]
想定どおりになりました!クエリが非同期で実行され、そのクエリ実行をポーリングして待ってから、クエリ完了後に結果表示がされていますね。
まとめ
以上、Snowflake Connector for Pythonの非同期クエリを試してみました。実行に時間の掛かるクエリで、同期的に待てない場合などにとても役立ちそうな機能ですね。
どなたかのお役に立てば幸いです。それでは!