Snowflake SQLAlchemy Toolkitを試してみた

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

SnowflakeにはPython用のコネクタが公開されており、ORMであるSQLAlchemyの対応もされています。 今回は「Snowflake SQLAlchemy」の導入から実際に試してみたいと思います。

なお、以下の記事では具体的なPandasでの利用について解説されているので、こちらもオススメです。

Snowflake Connector for Python について

PythonからSnowflakeに接続するには、「Snowflake Connector for Python」が必要となります。Pythonのバージョンとしては3.5.0以上が必要となり、pipコマンドでインストールが可能です。

Snowflake Connector for Python — Snowflake Documentation

ただ、今回はこちらについては扱いません。

Snowflake SQLAlchemy について

「Snowflake SQLAlchemy」はSnowflakeとSQLAlchemyアプリケーションを繋ぐものになります。こちらには「Snowflake Connector for Python」も内部に含まれており「Snowflake Connector for Python」の上のレイヤで「Snowflake SQLAlchemy」が動くため、「Snowflake SQLAlchemy」をインストールするだけでSnowflakeに接続することができます。

Using the Snowflake SQLAlchemy Toolkit with the Python Connector — Snowflake Documentation

今回はこちらについて扱いたいと思います。

下準備

Python環境についてですが、私はpyenv(+anyenv)とpipenvで構築しています。「Snowflake Connector for Python」の要件を満たせばよいので、Pythonのバージョンは3.5.0以上が必要ということになります。今回は3.8.1を利用することにしました。

検証用のパスに移動し、Pythonの3.8.1を利用するようにpyenvpipenvで指定します。

$ pyenv local 3.8.1
$ python --version
Python 3.8.1
$ pipenv --python 3.8.1
$ cat Pipfile 
[source]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]

[requires]
python_version = "3.8"

これで、準備環境です。

Snowflake SQLAlchemy のインストール

インストールは、下記コマンドで行います。

$ pipenv install snowflake-sqlalchemy
Installing snowflake-sqlalchemy…
Adding snowflake-sqlalchemy to Pipfile's [packages]…
✔ Installation Succeeded 
Pipfile.lock not found, creating…
Locking [dev-packages] dependencies…
Locking [packages] dependencies…
✔ Success! 
Updated Pipfile.lock (6bfa57)!
Installing dependencies from Pipfile.lock (6bfa57)…
  🐍   ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 28/28 — 00:00:55
To activate this project's virtualenv, run pipenv shell.
Alternatively, run a command inside the virtualenv with pipenv run.

インストールが成功すると、Pipfile.lockが作成されパッケージが追加されたことが確認できます。

        "snowflake-sqlalchemy": {
            "hashes": [
                "sha256:098a8e527a3b177f399c97f0f16fca83a4ddc0224844a7368e8fddce4902898c",
                "sha256:218c8d79ab608fcab90fc6ddb8b9f3cc1246f903a0b280eb6f0881fcad3b8f28"
            ],
            "index": "pypi",
            "version": "==1.2.0"
        },

これでインストールができました。

SQLAlchemy用の接続パラメータについて

接続文字列としては、以下が必須パラメータとなります。

snowflake://<user_login_name>:<password>@<account_name>
パラメータ 概要
user_login_name Snowflakeユーザのログイン名
password Snowflakeユーザのパスワード
account_name フルアカウント名(*)

「フルアカウント名」とは、SnowflakeのログインURLのうち、snowflakecomputing.comより左側の名前になります。 例えば、foobar.ap-southeast-1.aws.snowflakecomputing.comというURLの場合には、foobar.ap-southeast-1.awsとなります。

詳しくはRequired Parametersに記載されています。

更に追加パラメータとしては以下のパラメータがあります。

snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>
パラメータ 概要
database_name データベース名
schema_name スキーマ名
warehouse_name ウェアハウス名
role_name ロール名

これは、すべて初期接続時のセッションにおける値となり、もちろんセッション内で変更することが可能です。

接続してみる

では、実際に接続してみたいと思いますが、上記の通りパラメータ変数が色々とあるのでpython-dotenvもインストールしておきます。

$ pipenv install python-dotenv

インストールしたら、利用するパラメータを.envファイルに=区切りで記述しておきます。

SF_USER_LOGIN_NAME=<user_login_name>
SF_PASSWORD=<password>
SF_ACCOUNT_NAME=<account_name>
SF_DATABASE_NAME=<database_name>
SF_SCHEMA_NAME=<schema_name>
SF_WAREHOUSE_NAME=<warehouse_name>
SF_ROLE_NAME=<role_name>

その上で、以下のようなコードで接続してみます。.envからの値取得はsettings.pyから行うようにしています。

import os
from pathlib import Path
from dotenv import load_dotenv

# load env
env_path = Path('.') / '.env'
load_dotenv(env_path)

# env values
SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME')
SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME')
SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME')
SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME')
SF_ROLE_NAME = os.getenv('SF_ROLE_NAME')
import settings
from sqlalchemy import create_engine


def hello_snowflake():
    connection_string_format = 'snowflake://{}:{}@{}/{}/{}?warehouse={}&role={}'
    connection_string = connection_string_format.format(settings.SF_USER_LOGIN_NAME,
                                                        settings.SF_PASSWORD,
                                                        settings.SF_ACCOUNT_NAME,
                                                        settings.SF_DATABASE_NAME,
                                                        settings.SF_SCHEMA_NAME,
                                                        settings.SF_WAREHOUSE_NAME,
                                                        settings.SF_ROLE_NAME)
    engine = create_engine(connection_string)
    connection = engine.connect()

    try:
        query = 'SELECT C_CUSTKEY, C_NAME FROM CUSTOMER WHERE C_CUSTKEY <= 5'
        rows = connection.execute(query).fetchall()
        for row in rows:
            print(row)
    finally:
        connection.close()
        engine.dispose()


if __name__ == "__main__":
    hello_snowflake()

結果は以下のようになります。

$ pipenv run python3 hello-snowflake-sqlalchemy.py 
Loading .env environment variables…
(1, 'Customer#000000001')
(2, 'Customer#000000002')
(3, 'Customer#000000003')
(4, 'Customer#000000004')
(5, 'Customer#000000005')

問題なくSnowflakeに接続できており、データも取得できました。

SQLAlchemyっぽく書いてみる

もう少し、SQLAlchemyっぽく書いてみます。まずはsettings.pyを修正して、EngineSessionを持たせます。また、engine作成時の接続文字列の記述が煩雑だったので、snowflake.sqlalchemy.URLを利用してもう少しスッキリさせました。

import os
from pathlib import Path
from dotenv import load_dotenv
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# load env
env_path = Path('.') / '.env'
load_dotenv(env_path)

# env values
SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME')
SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME')
SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME')
SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME')
SF_ROLE_NAME = os.getenv('SF_ROLE_NAME')

# SQLAlchemy Engine
Engine = create_engine(URL(
    account=SF_ACCOUNT_NAME,
    user=SF_USER_LOGIN_NAME,
    password=SF_PASSWORD,
    database=SF_DATABASE_NAME,
    schema=SF_SCHEMA_NAME,
    warehouse=SF_WAREHOUSE_NAME,
    role=SF_ROLE_NAME
))

# SQLAlchemy Session
Session = sessionmaker(
    autocommit=False,
    expire_on_commit=False,
    autoflush=True,
    bind=Engine
)

次に、models.pyを用意してCustomerを定義します。実際のテーブル定義とはちょっと違う(カラムが足らない)のですが、今回はサンプルとして利用するカラムだけとしました。

from snowflake.sqlalchemy import URL
from sqlalchemy import Column, Numeric, String
from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()


class Customer(Base):
    __tablename__ = 'customer'

    c_custkey = Column('c_custkey', Numeric(38, 0), primary_key = True)
    c_name = Column('c_name', String)

最後にhello-snowflake-sqlalchemy-ex.pyです。最初のものより大分スッキリとして、データの取得もSQLAlchemyっぽくなっています。

from settings import Session
from models import Customer


def hello_snowflake():

    try:
        session = Session()
        rows = session.query(Customer.c_custkey, Customer.c_name).filter(
            Customer.c_custkey <= 5).all()
        for row in rows:
            print(row)
    finally:
        session.close()


if __name__ == "__main__":
    hello_snowflake()

実行してみます。実行結果は最初のものと変わりませんね。

$ pipenv run python3 hello-snowflake-sqlalchemy-ex.py
Loading .env environment variables…
(1, 'Customer#000000001')
(2, 'Customer#000000002')
(3, 'Customer#000000003')
(4, 'Customer#000000004')
(5, 'Customer#000000005')

まとめ

以上、DBクライアントツール「Aginity Pro」でSnowflakeへの接続を試してみた結果でした。接続も問題ありませんし、バッチリSQLAlchemyなコードも記述できるので良い感じにコードが書けるのではないでしょうか。

どなたかのお役に立てば幸いです。それでは!