Snowflake SQLAlchemy Toolkitを試してみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
SnowflakeにはPython用のコネクタが公開されており、ORMであるSQLAlchemyの対応もされています。 今回は「Snowflake SQLAlchemy」の導入から実際に試してみたいと思います。
なお、以下の記事では具体的なPandasでの利用について解説されているので、こちらもオススメです。
Snowflake Connector for Python について
PythonからSnowflakeに接続するには、「Snowflake Connector for Python」が必要となります。Pythonのバージョンとしては3.5.0
以上が必要となり、pip
コマンドでインストールが可能です。
Snowflake Connector for Python — Snowflake Documentation
ただ、今回はこちらについては扱いません。
Snowflake SQLAlchemy について
「Snowflake SQLAlchemy」はSnowflakeとSQLAlchemyアプリケーションを繋ぐものになります。こちらには「Snowflake Connector for Python」も内部に含まれており「Snowflake Connector for Python」の上のレイヤで「Snowflake SQLAlchemy」が動くため、「Snowflake SQLAlchemy」をインストールするだけでSnowflakeに接続することができます。
Using the Snowflake SQLAlchemy Toolkit with the Python Connector — Snowflake Documentation
今回はこちらについて扱いたいと思います。
下準備
Python環境についてですが、私はpyenv(+anyenv)とpipenvで構築しています。「Snowflake Connector for Python」の要件を満たせばよいので、Pythonのバージョンは3.5.0
以上が必要ということになります。今回は3.8.1
を利用することにしました。
検証用のパスに移動し、Pythonの3.8.1
を利用するようにpyenv
とpipenv
で指定します。
$ pyenv local 3.8.1 $ python --version Python 3.8.1 $ pipenv --python 3.8.1 $ cat Pipfile [source] name = "pypi" url = "https://pypi.org/simple" verify_ssl = true [dev-packages] [packages] [requires] python_version = "3.8"
これで、準備環境です。
Snowflake SQLAlchemy のインストール
インストールは、下記コマンドで行います。
$ pipenv install snowflake-sqlalchemy Installing snowflake-sqlalchemy… Adding snowflake-sqlalchemy to Pipfile's [packages]… ✔ Installation Succeeded Pipfile.lock not found, creating… Locking [dev-packages] dependencies… Locking [packages] dependencies… ✔ Success! Updated Pipfile.lock (6bfa57)! Installing dependencies from Pipfile.lock (6bfa57)… ? ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 28/28 — 00:00:55 To activate this project's virtualenv, run pipenv shell. Alternatively, run a command inside the virtualenv with pipenv run.
インストールが成功すると、Pipfile.lock
が作成されパッケージが追加されたことが確認できます。
"snowflake-sqlalchemy": { "hashes": [ "sha256:098a8e527a3b177f399c97f0f16fca83a4ddc0224844a7368e8fddce4902898c", "sha256:218c8d79ab608fcab90fc6ddb8b9f3cc1246f903a0b280eb6f0881fcad3b8f28" ], "index": "pypi", "version": "==1.2.0" },
これでインストールができました。
SQLAlchemy用の接続パラメータについて
接続文字列としては、以下が必須パラメータとなります。
snowflake://<user_login_name>:<password>@<account_name>
パラメータ | 概要 |
---|---|
user_login_name | Snowflakeユーザのログイン名 |
password | Snowflakeユーザのパスワード |
account_name | フルアカウント名(*) |
「フルアカウント名」とは、SnowflakeのログインURLのうち、snowflakecomputing.com
より左側の名前になります。
例えば、foobar.ap-southeast-1.aws.snowflakecomputing.com
というURLの場合には、foobar.ap-southeast-1.aws
となります。
詳しくはRequired Parametersに記載されています。
更に追加パラメータとしては以下のパラメータがあります。
snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>
パラメータ | 概要 |
---|---|
database_name | データベース名 |
schema_name | スキーマ名 |
warehouse_name | ウェアハウス名 |
role_name | ロール名 |
これは、すべて初期接続時のセッションにおける値となり、もちろんセッション内で変更することが可能です。
接続してみる
では、実際に接続してみたいと思いますが、上記の通りパラメータ変数が色々とあるのでpython-dotenvもインストールしておきます。
$ pipenv install python-dotenv
インストールしたら、利用するパラメータを.env
ファイルに=
区切りで記述しておきます。
SF_USER_LOGIN_NAME=<user_login_name> SF_PASSWORD=<password> SF_ACCOUNT_NAME=<account_name> SF_DATABASE_NAME=<database_name> SF_SCHEMA_NAME=<schema_name> SF_WAREHOUSE_NAME=<warehouse_name> SF_ROLE_NAME=<role_name>
その上で、以下のようなコードで接続してみます。.env
からの値取得はsettings.py
から行うようにしています。
import os from pathlib import Path from dotenv import load_dotenv # load env env_path = Path('.') / '.env' load_dotenv(env_path) # env values SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME') SF_PASSWORD = os.getenv('SF_PASSWORD') SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME') SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME') SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME') SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME') SF_ROLE_NAME = os.getenv('SF_ROLE_NAME')
import settings from sqlalchemy import create_engine def hello_snowflake(): connection_string_format = 'snowflake://{}:{}@{}/{}/{}?warehouse={}&role={}' connection_string = connection_string_format.format(settings.SF_USER_LOGIN_NAME, settings.SF_PASSWORD, settings.SF_ACCOUNT_NAME, settings.SF_DATABASE_NAME, settings.SF_SCHEMA_NAME, settings.SF_WAREHOUSE_NAME, settings.SF_ROLE_NAME) engine = create_engine(connection_string) connection = engine.connect() try: query = 'SELECT C_CUSTKEY, C_NAME FROM CUSTOMER WHERE C_CUSTKEY <= 5' rows = connection.execute(query).fetchall() for row in rows: print(row) finally: connection.close() engine.dispose() if __name__ == "__main__": hello_snowflake()
結果は以下のようになります。
$ pipenv run python3 hello-snowflake-sqlalchemy.py Loading .env environment variables… (1, 'Customer#000000001') (2, 'Customer#000000002') (3, 'Customer#000000003') (4, 'Customer#000000004') (5, 'Customer#000000005')
問題なくSnowflakeに接続できており、データも取得できました。
SQLAlchemyっぽく書いてみる
もう少し、SQLAlchemyっぽく書いてみます。まずはsettings.py
を修正して、Engine
とSession
を持たせます。また、engine作成時の接続文字列の記述が煩雑だったので、snowflake.sqlalchemy.URL
を利用してもう少しスッキリさせました。
import os from pathlib import Path from dotenv import load_dotenv from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # load env env_path = Path('.') / '.env' load_dotenv(env_path) # env values SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME') SF_PASSWORD = os.getenv('SF_PASSWORD') SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME') SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME') SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME') SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME') SF_ROLE_NAME = os.getenv('SF_ROLE_NAME') # SQLAlchemy Engine Engine = create_engine(URL( account=SF_ACCOUNT_NAME, user=SF_USER_LOGIN_NAME, password=SF_PASSWORD, database=SF_DATABASE_NAME, schema=SF_SCHEMA_NAME, warehouse=SF_WAREHOUSE_NAME, role=SF_ROLE_NAME )) # SQLAlchemy Session Session = sessionmaker( autocommit=False, expire_on_commit=False, autoflush=True, bind=Engine )
次に、models.py
を用意してCustomer
を定義します。実際のテーブル定義とはちょっと違う(カラムが足らない)のですが、今回はサンプルとして利用するカラムだけとしました。
from snowflake.sqlalchemy import URL from sqlalchemy import Column, Numeric, String from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class Customer(Base): __tablename__ = 'customer' c_custkey = Column('c_custkey', Numeric(38, 0), primary_key = True) c_name = Column('c_name', String)
最後にhello-snowflake-sqlalchemy-ex.py
です。最初のものより大分スッキリとして、データの取得もSQLAlchemyっぽくなっています。
from settings import Session from models import Customer def hello_snowflake(): try: session = Session() rows = session.query(Customer.c_custkey, Customer.c_name).filter( Customer.c_custkey <= 5).all() for row in rows: print(row) finally: session.close() if __name__ == "__main__": hello_snowflake()
実行してみます。実行結果は最初のものと変わりませんね。
$ pipenv run python3 hello-snowflake-sqlalchemy-ex.py Loading .env environment variables… (1, 'Customer#000000001') (2, 'Customer#000000002') (3, 'Customer#000000003') (4, 'Customer#000000004') (5, 'Customer#000000005')
まとめ
以上、DBクライアントツール「Aginity Pro」でSnowflakeへの接続を試してみた結果でした。接続も問題ありませんし、バッチリSQLAlchemyなコードも記述できるので良い感じにコードが書けるのではないでしょうか。
どなたかのお役に立てば幸いです。それでは!