Snowflake Connector for Pythonの非同期クエリを試してみた

2021.01.13

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

SnowflakeをPythonから利用するための「Snowflake Connector for Python」のバージョン2.3.7から、非同期クエリのサポートが開始されました。

これまでは同期的なクエリの実行でしたが、非同期クエリがサポートされたことにより、クエリを非同期で発行して、後から結果の確認をすることができるようになります。

今回は実際にこの機能を試してみたいと思います。

前提

前提として、以下の環境で検証を行っています。

  • WSL2 (Ubuntu 18.04.3 LTS)
  • Python 3.8.6
  • Snowflake Connector for Python 2.3.7

また、Snowflake Connector for Pythonについては以下を参考に導入済みとなります。

実行するクエリ

今回実行するクエリは以下になります。

sql/long-query.sql

SELECT
  count(*)
FROM
  TABLE(generator(timeLimit => 5))
;

GENERATOR関数を利用して、5秒間の間データレコードを作成します。その結果、作成されたレコード数をCOUNTで取得するクエリとなっています。

データの中身に特に意味はなく、単純に「5秒間」という期間でクエリを実行するためのクエリとなっています。

Pythonコード

次に実行するPythonコードです。pipenvを利用しており、環境変数については事前に.envファイルに定義しています。

.env

SNOWFLAKE_USER=XXXXXXXXXX
SNOWFLAKE_PASSWORD=XXXXXXXXXX
SNOWFLAKE_ACCOUNT=XXXXXXXXXX.ap-northeast-1.aws
SNOWFLAKE_WAREHOUSE=XXXXXXXXXX
SNOWFLAKE_DATABASE=XXXXXXXXXX
SNOWFLAKE_SCHEMA=XXXXXXXXXX
SNOWFLAKE_ROLE=XXXXXXXXXX

async-query-sample.py

import os
import time

import snowflake.connector
from snowflake.connector import DictCursor


def get_connection():
    con = snowflake.connector.connect(
        user = os.environ['SNOWFLAKE_USER'],
        password = os.environ['SNOWFLAKE_PASSWORD'],
        account = os.environ['SNOWFLAKE_ACCOUNT'],
        role = os.environ['SNOWFLAKE_ROLE'],
        warehouse = os.environ['SNOWFLAKE_WAREHOUSE'],
        database = os.environ['SNOWFLAKE_DATABASE'],
        schema = os.environ['SNOWFLAKE_SCHEMA']
    )
    return con


def get_query(query_file_path):
    with open(query_file_path, 'r', encoding='utf-8') as f:
        query = f.read()
    return query


def exec_async_query(query_file_path):
    with get_connection() as con:
        with con.cursor(cursor_class=DictCursor) as cur:
            query = get_query(query_file_path)
            cur.execute_async(query)
            query_id = cur.sfqid
            print('async query executed.')

            while con.is_still_running(con.get_query_status_throw_if_error(query_id)):
                print('waiting query results...')
                time.sleep(1)
            
            cur.get_results_from_sfqid(query_id)
            print('recieved query results.')
            results = cur.fetchall()
            print('query result:')
            print(results)


if __name__ == "__main__":
    query_file_path = 'sql/long-query.sql'
    exec_async_query(query_file_path)

クエリの実行はcur.execute_async(query)とすることで非同期実行となっています。この実行後にasync query executed.と表示して、非同期実行が終わったことを表示しています。

実行結果については、query_id = cur.sfqidでクエリIDを取得し、con.is_still_running(con.get_query_status_throw_if_error(query_id))を利用してクエリがまだ実行中かどうかをポーリングしてチェックしています。チェック中には1秒ごとにwaiting query results...と表示します。

その後、クエリの結果をcur.get_results_from_sfqid(query_id)で取得して、recieved query results.と表示してからフェッチ結果を表示します。

実行してみる

では、実際に実行してみましょう。

$ python async-query-sample.py 
async query executed.
waiting query results...
waiting query results...
waiting query results...
waiting query results...
waiting query results...
recieved query results.
query result:
[{'COUNT(*)': 109586874368}]

想定どおりになりました!クエリが非同期で実行され、そのクエリ実行をポーリングして待ってから、クエリ完了後に結果表示がされていますね。

まとめ

以上、Snowflake Connector for Pythonの非同期クエリを試してみました。実行に時間の掛かるクエリで、同期的に待てない場合などにとても役立ちそうな機能ですね。

どなたかのお役に立てば幸いです。それでは!

参考