この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
SnowflakeにはPython用のコネクタが公開されており、ORMであるSQLAlchemyの対応もされています。 今回は「Snowflake SQLAlchemy」の導入から実際に試してみたいと思います。
なお、以下の記事では具体的なPandasでの利用について解説されているので、こちらもオススメです。
Snowflake Connector for Python について
PythonからSnowflakeに接続するには、「Snowflake Connector for Python」が必要となります。Pythonのバージョンとしては3.5.0
以上が必要となり、pip
コマンドでインストールが可能です。
Snowflake Connector for Python — Snowflake Documentation
ただ、今回はこちらについては扱いません。
Snowflake SQLAlchemy について
「Snowflake SQLAlchemy」はSnowflakeとSQLAlchemyアプリケーションを繋ぐものになります。こちらには「Snowflake Connector for Python」も内部に含まれており「Snowflake Connector for Python」の上のレイヤで「Snowflake SQLAlchemy」が動くため、「Snowflake SQLAlchemy」をインストールするだけでSnowflakeに接続することができます。
Using the Snowflake SQLAlchemy Toolkit with the Python Connector — Snowflake Documentation
今回はこちらについて扱いたいと思います。
下準備
Python環境についてですが、私はpyenv(+anyenv)とpipenvで構築しています。「Snowflake Connector for Python」の要件を満たせばよいので、Pythonのバージョンは3.5.0
以上が必要ということになります。今回は3.8.1
を利用することにしました。
検証用のパスに移動し、Pythonの3.8.1
を利用するようにpyenv
とpipenv
で指定します。
$ pyenv local 3.8.1
$ python --version
Python 3.8.1
$ pipenv --python 3.8.1
$ cat Pipfile
[source]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
[requires]
python_version = "3.8"
これで、準備環境です。
Snowflake SQLAlchemy のインストール
インストールは、下記コマンドで行います。
$ pipenv install snowflake-sqlalchemy
Installing snowflake-sqlalchemy…
Adding snowflake-sqlalchemy to Pipfile's [packages]…
✔ Installation Succeeded
Pipfile.lock not found, creating…
Locking [dev-packages] dependencies…
Locking [packages] dependencies…
✔ Success!
Updated Pipfile.lock (6bfa57)!
Installing dependencies from Pipfile.lock (6bfa57)…
? ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 28/28 — 00:00:55
To activate this project's virtualenv, run pipenv shell.
Alternatively, run a command inside the virtualenv with pipenv run.
インストールが成功すると、Pipfile.lock
が作成されパッケージが追加されたことが確認できます。
Pipfile.lock(抜粋)
"snowflake-sqlalchemy": {
"hashes": [
"sha256:098a8e527a3b177f399c97f0f16fca83a4ddc0224844a7368e8fddce4902898c",
"sha256:218c8d79ab608fcab90fc6ddb8b9f3cc1246f903a0b280eb6f0881fcad3b8f28"
],
"index": "pypi",
"version": "==1.2.0"
},
これでインストールができました。
SQLAlchemy用の接続パラメータについて
接続文字列としては、以下が必須パラメータとなります。
snowflake://<user_login_name>:<password>@<account_name>
パラメータ | 概要 |
---|---|
user_login_name | Snowflakeユーザのログイン名 |
password | Snowflakeユーザのパスワード |
account_name | フルアカウント名(*) |
「フルアカウント名」とは、SnowflakeのログインURLのうち、snowflakecomputing.com
より左側の名前になります。
例えば、foobar.ap-southeast-1.aws.snowflakecomputing.com
というURLの場合には、foobar.ap-southeast-1.aws
となります。
詳しくはRequired Parametersに記載されています。
更に追加パラメータとしては以下のパラメータがあります。
snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>
パラメータ | 概要 |
---|---|
database_name | データベース名 |
schema_name | スキーマ名 |
warehouse_name | ウェアハウス名 |
role_name | ロール名 |
これは、すべて初期接続時のセッションにおける値となり、もちろんセッション内で変更することが可能です。
接続してみる
では、実際に接続してみたいと思いますが、上記の通りパラメータ変数が色々とあるのでpython-dotenvもインストールしておきます。
$ pipenv install python-dotenv
インストールしたら、利用するパラメータを.env
ファイルに=
区切りで記述しておきます。
.env
SF_USER_LOGIN_NAME=<user_login_name>
SF_PASSWORD=<password>
SF_ACCOUNT_NAME=<account_name>
SF_DATABASE_NAME=<database_name>
SF_SCHEMA_NAME=<schema_name>
SF_WAREHOUSE_NAME=<warehouse_name>
SF_ROLE_NAME=<role_name>
その上で、以下のようなコードで接続してみます。.env
からの値取得はsettings.py
から行うようにしています。
settings.py
import os
from pathlib import Path
from dotenv import load_dotenv
# load env
env_path = Path('.') / '.env'
load_dotenv(env_path)
# env values
SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME')
SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME')
SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME')
SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME')
SF_ROLE_NAME = os.getenv('SF_ROLE_NAME')
hello-snowflake-sqlalchemy.py
import settings
from sqlalchemy import create_engine
def hello_snowflake():
connection_string_format = 'snowflake://{}:{}@{}/{}/{}?warehouse={}&role={}'
connection_string = connection_string_format.format(settings.SF_USER_LOGIN_NAME,
settings.SF_PASSWORD,
settings.SF_ACCOUNT_NAME,
settings.SF_DATABASE_NAME,
settings.SF_SCHEMA_NAME,
settings.SF_WAREHOUSE_NAME,
settings.SF_ROLE_NAME)
engine = create_engine(connection_string)
connection = engine.connect()
try:
query = 'SELECT C_CUSTKEY, C_NAME FROM CUSTOMER WHERE C_CUSTKEY <= 5'
rows = connection.execute(query).fetchall()
for row in rows:
print(row)
finally:
connection.close()
engine.dispose()
if __name__ == "__main__":
hello_snowflake()
結果は以下のようになります。
$ pipenv run python3 hello-snowflake-sqlalchemy.py
Loading .env environment variables…
(1, 'Customer#000000001')
(2, 'Customer#000000002')
(3, 'Customer#000000003')
(4, 'Customer#000000004')
(5, 'Customer#000000005')
問題なくSnowflakeに接続できており、データも取得できました。
SQLAlchemyっぽく書いてみる
もう少し、SQLAlchemyっぽく書いてみます。まずはsettings.py
を修正して、Engine
とSession
を持たせます。また、engine作成時の接続文字列の記述が煩雑だったので、snowflake.sqlalchemy.URL
を利用してもう少しスッキリさせました。
settings.py
import os
from pathlib import Path
from dotenv import load_dotenv
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# load env
env_path = Path('.') / '.env'
load_dotenv(env_path)
# env values
SF_USER_LOGIN_NAME = os.getenv('SF_USER_LOGIN_NAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_ACCOUNT_NAME = os.getenv('SF_ACCOUNT_NAME')
SF_DATABASE_NAME = os.getenv('SF_DATABASE_NAME')
SF_SCHEMA_NAME = os.getenv('SF_SCHEMA_NAME')
SF_WAREHOUSE_NAME = os.getenv('SF_WAREHOUSE_NAME')
SF_ROLE_NAME = os.getenv('SF_ROLE_NAME')
# SQLAlchemy Engine
Engine = create_engine(URL(
account=SF_ACCOUNT_NAME,
user=SF_USER_LOGIN_NAME,
password=SF_PASSWORD,
database=SF_DATABASE_NAME,
schema=SF_SCHEMA_NAME,
warehouse=SF_WAREHOUSE_NAME,
role=SF_ROLE_NAME
))
# SQLAlchemy Session
Session = sessionmaker(
autocommit=False,
expire_on_commit=False,
autoflush=True,
bind=Engine
)
次に、models.py
を用意してCustomer
を定義します。実際のテーブル定義とはちょっと違う(カラムが足らない)のですが、今回はサンプルとして利用するカラムだけとしました。
models.py
from snowflake.sqlalchemy import URL
from sqlalchemy import Column, Numeric, String
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customer'
c_custkey = Column('c_custkey', Numeric(38, 0), primary_key = True)
c_name = Column('c_name', String)
最後にhello-snowflake-sqlalchemy-ex.py
です。最初のものより大分スッキリとして、データの取得もSQLAlchemyっぽくなっています。
hello-snowflake-sqlalchemy-ex.py
from settings import Session
from models import Customer
def hello_snowflake():
try:
session = Session()
rows = session.query(Customer.c_custkey, Customer.c_name).filter(
Customer.c_custkey <= 5).all()
for row in rows:
print(row)
finally:
session.close()
if __name__ == "__main__":
hello_snowflake()
実行してみます。実行結果は最初のものと変わりませんね。
$ pipenv run python3 hello-snowflake-sqlalchemy-ex.py
Loading .env environment variables…
(1, 'Customer#000000001')
(2, 'Customer#000000002')
(3, 'Customer#000000003')
(4, 'Customer#000000004')
(5, 'Customer#000000005')
まとめ
以上、DBクライアントツール「Aginity Pro」でSnowflakeへの接続を試してみた結果でした。接続も問題ありませんし、バッチリSQLAlchemyなコードも記述できるので良い感じにコードが書けるのではないでしょうか。
どなたかのお役に立てば幸いです。それでは!