PythonでSnowflakeへExcelからデータを連携する
データアナリティクス事業本部、池田です。
前回のブログ で、
Google スプレッドシートからSnowflakeへの連携を書いたので、
今回はExcelのデータを投入してみます。
方式の検討
今回はAmazon S3などを使わずに済ませたかったため、
Snowflakeのステージは、外部ではなく、内部ステージ
を使う前提で考えました。
Excelからアップロードできるファイル(JSON)への変換はPandasを使うとして、
そのあとは…
- Python(PUTコマンドでのアップロード) + Snowpipe + REST API
- Python(PUTコマンドでのアップロード) + Python(COPYコマンドでのロード)
個人的にはシンプルに実装するなら後者のCOPYコマンドかと思います。
PythonアプリからCOPYコマンドでなくREST APIを使う場合の動機は、
ロードの設定をアプリ側でなくSnowflake(パイプ)側に持たせられるとか、
Snowpipeの ファイルロードメタデータ
で再ロード防止したいとかでしょうか…
今回は、PythonでCOPYコマンドを使って実装をしていきます。
REST APIは機会があればブログにしたいと思います。
以下、環境は、Windows10・Python3.7・SnowflakeはSYSADMINロールで作業しています。
サンプルとして使用するExcelファイルは↓のような感じです。
input.xlsx
ロード先のSnowflakeテーブルの作成
前回 はSnowflakeのコンソールからGUIで作業したので、今回はさくっとクエリでやります。
内部ステージ のうち、 テーブルステージ を使用するので、ステージの明示的な作成はしません。 (テーブルの所有者以外がロードするような場合は、名前付きステージの方を使う必要があります。)
前回と同じく、テーブルには連携された全ての回のデータをJSONで蓄積して、ビューで最新回のみ抽出する構成です。
-- ファイル形式名・テーブル名・ビュー名 SET (format_name, table_name, view_name) = ('{ファイル形式名}', '{テーブル名}', '{ビュー名}'); -- ファイル形式 CREATE FILE FORMAT IDENTIFIER($format_name) TYPE = 'JSON' COMPRESSION = 'AUTO' STRIP_OUTER_ARRAY = TRUE; -- テーブル CREATE TABLE IDENTIFIER($table_name) ("V" VARIANT NOT NULL) STAGE_COPY_OPTIONS = (PURGE = TRUE); -- ビュー CREATE VIEW IDENTIFIER($view_name) AS SELECT V:id_str::VARCHAR AS "ID_STR", V:str::VARCHAR AS "STR", V:date::TIMESTAMP_NTZ AS "DATE", V:int::INT AS "INT", V:str_ja::VARCHAR AS "STR_JA" FROM TABLE($table_name) WHERE V:import_datetime = ( SELECT MAX(V:import_datetime) FROM TABLE($table_name) ) ;
※↑コンテキストは適宜設定されている前提です。
ファイル形式
の STRIP_OUTER_ARRAY
は、
JSON パーサーに外側の括弧(つまり [ ])を削除するよう指示するブール値。
テーブル
の STAGE_COPY_OPTIONS
の PURGE
は、
データが正常にロードされた後、ステージからデータファイルを自動的に削除するかどうかを指定するブール値。
IDENTIFIER()
と TABLE()
はセッション変数(2行目)を
うまく扱うため
に噛ませています。
Snowflakeへアップロードとロードを行うPythonの作成
Pandas でExcelファイル→JSON一時ファイルに変換して、 Python用Snowflakeコネクタ (要インストール)でアップロード(PUT)とロード(COPY)のクエリを発行します。
import sys import os import time import pandas as pd import snowflake.connector # Snowflakeの接続先情報 TABLE = "{テーブル}" FORMAT = "{フォーマット}" SCHEMA = "{スキーマ}" DATABASE = "{データベース}" WAREHOUSE = "{ウェアハウス}" # Snowflakeの認証情報 ACCOUNT = "{アカウント}" USER = "{ユーザ}" PASSWORD = "{パスワード}" AFFIX = str(round(time.time())) TMP_JSON_FILE_NAME = "tmp_excel-to-snowflake_{}.json".format(AFFIX) def main(): # 実行引数で設定変更できるようにする args = sys.argv print("param: {}".format(args)) args_len = len(args) target_file_name = args[1] if args_len > 1 else None target_sheet_index = int(args[2]) if args_len > 2 else None global TABLE, FORMAT, SCHEMA, DATABASE, WAREHOUSE, ACCOUNT, USER, PASSWORD TABLE = args[3] if args_len > 3 else TABLE FORMAT = args[4] if args_len > 4 else FORMAT SCHEMA = args[5] if args_len > 5 else SCHEMA DATABASE = args[6] if args_len > 6 else DATABASE WAREHOUSE = args[7] if args_len > 7 else WAREHOUSE ACCOUNT = args[8] if args_len > 8 else ACCOUNT USER = args[9] if args_len > 9 else USER PASSWORD = args[10] if args_len > 10 else PASSWORD # Pythonファイルのディレクトリで作業する os.chdir(os.path.dirname(os.path.abspath(__file__))) # JSONファイルの作成 excel_to_json_file(target_file_name, target_sheet_index) try: # Snowflakeへのアップロードとロード import_to_sf() except Exception: raise finally: # JSONファイルの削除 del_tmp_file() def excel_to_json_file(target_file_name, target_sheet_index): target_file_name = target_file_name \ or input("Input file name: ").rstrip() \ or "./input.xlsx" target_sheet_index = target_sheet_index or 0 print("file: {}".format(target_file_name), "sheet: {}".format(target_sheet_index), sep="\n") # pandasでExcelを読込み df = pd.read_excel(target_file_name, sheet_name=target_sheet_index, dtype=str) # 文字列が数値になるのを避けるためのdtype df["import_datetime"] = AFFIX # ビューで絞り込みするための連携日時 with open(TMP_JSON_FILE_NAME, "w", encoding="utf-8") as w_file: df.to_json(w_file, orient="records", force_ascii=False, date_format="iso") def del_tmp_file(): os.remove(TMP_JSON_FILE_NAME) def import_to_sf(): input_abspath = os.path.abspath(TMP_JSON_FILE_NAME) # Python用Snowflakeコネクタ with snowflake.connector.connect(user=USER, password=PASSWORD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE, schema=SCHEMA) as conn, \ conn.cursor() as cur: # テーブルステージへアップロード(PUT) q_put = "PUT %s %s;" param_put = ("file://{}".format(input_abspath), "@%{}".format(TABLE)) print("query: {}".format(q_put), "param: {}".format(param_put), sep="\n") cur.execute(q_put, param_put) # テーブルへロード(COPY) q_copy = "COPY INTO IDENTIFIER(%s) " \ "FILE_FORMAT = (FORMAT_NAME = %s) PATTERN = %s;" param_copy = (TABLE, FORMAT, ".*{}.*".format(TMP_JSON_FILE_NAME)) print("query: {}".format(q_copy), "param: {}".format(param_copy), sep="\n") cur.execute(q_copy, param_copy) if __name__ == "__main__": main()
import_to_sf()
内で、各クエリを発行していますが、
COPYのクエリではテーブルステージを使用しているので、
ステージ名を省略しています。
テーブルの場所にあるファイルからデータをコピーする場合、Snowflakeはテーブルの場所にあるファイルを自動的にチェックするため、 FROM 句を省略できます。
COPYコマンド
では PATTERN
もしくは FILES
で指定しない限り、
ステージ内の他のファイルもロード対象になるようだったので、
意図しないロードを防ぐためファイル指定しました。
動かす
Pythonコード中にSnowflakeの認証・接続先情報を記載するか、 実行時の引数で指定します。
python ./excel-to-snowflake.py ./input.xlsx 0 EXCEL_TABLE
python ./excel-to-snowflake.py {Excelファイル名} {シート番号} {テーブル名} {ファイル形式名} {スキーマ名} {DB名} {ウェアハウス名} {アカウント名} {ユーザ名} {パスワード}
↓データが連携されてビューで見ることができました。
おわりに
内部ステージの種類の使い分けは良く考えて設計したいなと思いました。
作業前に想像していたよりExcelの扱いが楽で、Pandasすごい…
※↓前回のスプレッドシート版はこちら。
(追記)
こんなのもあるようです。