この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
SnowflakeをPythonから利用するための「Snowflake Connector for Python」のバージョン2.3.7から、非同期クエリのサポートが開始されました。
これまでは同期的なクエリの実行でしたが、非同期クエリがサポートされたことにより、クエリを非同期で発行して、後から結果の確認をすることができるようになります。
今回は実際にこの機能を試してみたいと思います。
前提
前提として、以下の環境で検証を行っています。
- WSL2 (Ubuntu 18.04.3 LTS)
- Python 3.8.6
- Snowflake Connector for Python 2.3.7
また、Snowflake Connector for Pythonについては以下を参考に導入済みとなります。
実行するクエリ
今回実行するクエリは以下になります。
sql/long-query.sql
SELECT
count(*)
FROM
TABLE(generator(timeLimit => 5))
;
GENERATOR関数を利用して、5秒間の間データレコードを作成します。その結果、作成されたレコード数をCOUNTで取得するクエリとなっています。
データの中身に特に意味はなく、単純に「5秒間」という期間でクエリを実行するためのクエリとなっています。
Pythonコード
次に実行するPythonコードです。pipenvを利用しており、環境変数については事前に.env
ファイルに定義しています。
.env
SNOWFLAKE_USER=XXXXXXXXXX
SNOWFLAKE_PASSWORD=XXXXXXXXXX
SNOWFLAKE_ACCOUNT=XXXXXXXXXX.ap-northeast-1.aws
SNOWFLAKE_WAREHOUSE=XXXXXXXXXX
SNOWFLAKE_DATABASE=XXXXXXXXXX
SNOWFLAKE_SCHEMA=XXXXXXXXXX
SNOWFLAKE_ROLE=XXXXXXXXXX
async-query-sample.py
import os
import time
import snowflake.connector
from snowflake.connector import DictCursor
def get_connection():
con = snowflake.connector.connect(
user = os.environ['SNOWFLAKE_USER'],
password = os.environ['SNOWFLAKE_PASSWORD'],
account = os.environ['SNOWFLAKE_ACCOUNT'],
role = os.environ['SNOWFLAKE_ROLE'],
warehouse = os.environ['SNOWFLAKE_WAREHOUSE'],
database = os.environ['SNOWFLAKE_DATABASE'],
schema = os.environ['SNOWFLAKE_SCHEMA']
)
return con
def get_query(query_file_path):
with open(query_file_path, 'r', encoding='utf-8') as f:
query = f.read()
return query
def exec_async_query(query_file_path):
with get_connection() as con:
with con.cursor(cursor_class=DictCursor) as cur:
query = get_query(query_file_path)
cur.execute_async(query)
query_id = cur.sfqid
print('async query executed.')
while con.is_still_running(con.get_query_status_throw_if_error(query_id)):
print('waiting query results...')
time.sleep(1)
cur.get_results_from_sfqid(query_id)
print('recieved query results.')
results = cur.fetchall()
print('query result:')
print(results)
if __name__ == "__main__":
query_file_path = 'sql/long-query.sql'
exec_async_query(query_file_path)
クエリの実行はcur.execute_async(query)
とすることで非同期実行となっています。この実行後にasync query executed.
と表示して、非同期実行が終わったことを表示しています。
実行結果については、query_id = cur.sfqid
でクエリIDを取得し、con.is_still_running(con.get_query_status_throw_if_error(query_id))
を利用してクエリがまだ実行中かどうかをポーリングしてチェックしています。チェック中には1秒ごとにwaiting query results...
と表示します。
その後、クエリの結果をcur.get_results_from_sfqid(query_id)
で取得して、recieved query results.
と表示してからフェッチ結果を表示します。
実行してみる
では、実際に実行してみましょう。
$ python async-query-sample.py
async query executed.
waiting query results...
waiting query results...
waiting query results...
waiting query results...
waiting query results...
recieved query results.
query result:
[{'COUNT(*)': 109586874368}]
想定どおりになりました!クエリが非同期で実行され、そのクエリ実行をポーリングして待ってから、クエリ完了後に結果表示がされていますね。
まとめ
以上、Snowflake Connector for Pythonの非同期クエリを試してみました。実行に時間の掛かるクエリで、同期的に待てない場合などにとても役立ちそうな機能ですね。
どなたかのお役に立てば幸いです。それでは!