この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部、池田です。
前回のブログ で、
Google スプレッドシートからSnowflakeへの連携を書いたので、
今回はExcelのデータを投入してみます。
方式の検討
今回はAmazon S3などを使わずに済ませたかったため、
Snowflakeのステージは、外部ではなく、内部ステージ
を使う前提で考えました。
Excelからアップロードできるファイル(JSON)への変換はPandasを使うとして、
そのあとは…
- Python(PUTコマンドでのアップロード) + Snowpipe + REST API
- Python(PUTコマンドでのアップロード) + Python(COPYコマンドでのロード)
個人的にはシンプルに実装するなら後者のCOPYコマンドかと思います。
PythonアプリからCOPYコマンドでなくREST APIを使う場合の動機は、
ロードの設定をアプリ側でなくSnowflake(パイプ)側に持たせられるとか、
Snowpipeの ファイルロードメタデータ
で再ロード防止したいとかでしょうか…
今回は、PythonでCOPYコマンドを使って実装をしていきます。
REST APIは機会があればブログにしたいと思います。
以下、環境は、Windows10・Python3.7・SnowflakeはSYSADMINロールで作業しています。
サンプルとして使用するExcelファイルは↓のような感じです。
input.xlsx
ロード先のSnowflakeテーブルの作成
前回 はSnowflakeのコンソールからGUIで作業したので、今回はさくっとクエリでやります。
内部ステージ のうち、 テーブルステージ を使用するので、ステージの明示的な作成はしません。 (テーブルの所有者以外がロードするような場合は、名前付きステージの方を使う必要があります。)
前回と同じく、テーブルには連携された全ての回のデータをJSONで蓄積して、ビューで最新回のみ抽出する構成です。
-- ファイル形式名・テーブル名・ビュー名
SET (format_name, table_name, view_name) = ('{ファイル形式名}', '{テーブル名}', '{ビュー名}');
-- ファイル形式
CREATE FILE FORMAT IDENTIFIER($format_name)
TYPE = 'JSON'
COMPRESSION = 'AUTO'
STRIP_OUTER_ARRAY = TRUE;
-- テーブル
CREATE TABLE IDENTIFIER($table_name)
("V" VARIANT NOT NULL)
STAGE_COPY_OPTIONS = (PURGE = TRUE);
-- ビュー
CREATE VIEW IDENTIFIER($view_name)
AS
SELECT
V:id_str::VARCHAR AS "ID_STR",
V:str::VARCHAR AS "STR",
V:date::TIMESTAMP_NTZ AS "DATE",
V:int::INT AS "INT",
V:str_ja::VARCHAR AS "STR_JA"
FROM TABLE($table_name)
WHERE
V:import_datetime = (
SELECT MAX(V:import_datetime)
FROM TABLE($table_name)
)
;
※↑コンテキストは適宜設定されている前提です。
ファイル形式
の STRIP_OUTER_ARRAY
は、
JSON パーサーに外側の括弧(つまり [ ])を削除するよう指示するブール値。
テーブル
の STAGE_COPY_OPTIONS
の PURGE
は、
データが正常にロードされた後、ステージからデータファイルを自動的に削除するかどうかを指定するブール値。
IDENTIFIER()
と TABLE()
はセッション変数(2行目)を
うまく扱うため
に噛ませています。
Snowflakeへアップロードとロードを行うPythonの作成
Pandas でExcelファイル→JSON一時ファイルに変換して、 Python用Snowflakeコネクタ (要インストール)でアップロード(PUT)とロード(COPY)のクエリを発行します。
import sys
import os
import time
import pandas as pd
import snowflake.connector
# Snowflakeの接続先情報
TABLE = "{テーブル}"
FORMAT = "{フォーマット}"
SCHEMA = "{スキーマ}"
DATABASE = "{データベース}"
WAREHOUSE = "{ウェアハウス}"
# Snowflakeの認証情報
ACCOUNT = "{アカウント}"
USER = "{ユーザ}"
PASSWORD = "{パスワード}"
AFFIX = str(round(time.time()))
TMP_JSON_FILE_NAME = "tmp_excel-to-snowflake_{}.json".format(AFFIX)
def main():
# 実行引数で設定変更できるようにする
args = sys.argv
print("param: {}".format(args))
args_len = len(args)
target_file_name = args[1] if args_len > 1 else None
target_sheet_index = int(args[2]) if args_len > 2 else None
global TABLE, FORMAT, SCHEMA, DATABASE, WAREHOUSE, ACCOUNT, USER, PASSWORD
TABLE = args[3] if args_len > 3 else TABLE
FORMAT = args[4] if args_len > 4 else FORMAT
SCHEMA = args[5] if args_len > 5 else SCHEMA
DATABASE = args[6] if args_len > 6 else DATABASE
WAREHOUSE = args[7] if args_len > 7 else WAREHOUSE
ACCOUNT = args[8] if args_len > 8 else ACCOUNT
USER = args[9] if args_len > 9 else USER
PASSWORD = args[10] if args_len > 10 else PASSWORD
# Pythonファイルのディレクトリで作業する
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# JSONファイルの作成
excel_to_json_file(target_file_name, target_sheet_index)
try:
# Snowflakeへのアップロードとロード
import_to_sf()
except Exception:
raise
finally:
# JSONファイルの削除
del_tmp_file()
def excel_to_json_file(target_file_name, target_sheet_index):
target_file_name = target_file_name \
or input("Input file name: ").rstrip() \
or "./input.xlsx"
target_sheet_index = target_sheet_index or 0
print("file: {}".format(target_file_name),
"sheet: {}".format(target_sheet_index), sep="\n")
# pandasでExcelを読込み
df = pd.read_excel(target_file_name, sheet_name=target_sheet_index,
dtype=str) # 文字列が数値になるのを避けるためのdtype
df["import_datetime"] = AFFIX # ビューで絞り込みするための連携日時
with open(TMP_JSON_FILE_NAME, "w", encoding="utf-8") as w_file:
df.to_json(w_file,
orient="records", force_ascii=False, date_format="iso")
def del_tmp_file():
os.remove(TMP_JSON_FILE_NAME)
def import_to_sf():
input_abspath = os.path.abspath(TMP_JSON_FILE_NAME)
# Python用Snowflakeコネクタ
with snowflake.connector.connect(user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA) as conn, \
conn.cursor() as cur:
# テーブルステージへアップロード(PUT)
q_put = "PUT %s %s;"
param_put = ("file://{}".format(input_abspath), "@%{}".format(TABLE))
print("query: {}".format(q_put),
"param: {}".format(param_put), sep="\n")
cur.execute(q_put, param_put)
# テーブルへロード(COPY)
q_copy = "COPY INTO IDENTIFIER(%s) " \
"FILE_FORMAT = (FORMAT_NAME = %s) PATTERN = %s;"
param_copy = (TABLE, FORMAT, ".*{}.*".format(TMP_JSON_FILE_NAME))
print("query: {}".format(q_copy),
"param: {}".format(param_copy), sep="\n")
cur.execute(q_copy, param_copy)
if __name__ == "__main__":
main()
import_to_sf()
内で、各クエリを発行していますが、
COPYのクエリではテーブルステージを使用しているので、
ステージ名を省略しています。
テーブルの場所にあるファイルからデータをコピーする場合、Snowflakeはテーブルの場所にあるファイルを自動的にチェックするため、 FROM 句を省略できます。
COPYコマンド
では PATTERN
もしくは FILES
で指定しない限り、
ステージ内の他のファイルもロード対象になるようだったので、
意図しないロードを防ぐためファイル指定しました。
動かす
Pythonコード中にSnowflakeの認証・接続先情報を記載するか、 実行時の引数で指定します。
python ./excel-to-snowflake.py ./input.xlsx 0 EXCEL_TABLE
python ./excel-to-snowflake.py {Excelファイル名} {シート番号} {テーブル名} {ファイル形式名} {スキーマ名} {DB名} {ウェアハウス名} {アカウント名} {ユーザ名} {パスワード}
↓データが連携されてビューで見ることができました。
おわりに
内部ステージの種類の使い分けは良く考えて設計したいなと思いました。
作業前に想像していたよりExcelの扱いが楽で、Pandasすごい…
※↓前回のスプレッドシート版はこちら。
(追記)
こんなのもあるようです。