PythonでSnowflakeへExcelからデータを連携する

PandasとPython用Snowflakeコネクタを使って、ExcelファイルからSnowflakeテーブルへデータを連携する。
2020.07.03

データアナリティクス事業本部、池田です。
前回のブログ で、 Google スプレッドシートからSnowflakeへの連携を書いたので、 今回はExcelのデータを投入してみます。

方式の検討

今回はAmazon S3などを使わずに済ませたかったため、 Snowflakeのステージは、外部ではなく、内部ステージ を使う前提で考えました。
Excelからアップロードできるファイル(JSON)への変換はPandasを使うとして、 そのあとは…

  • Python(PUTコマンドでのアップロード) + Snowpipe + REST API
  • Python(PUTコマンドでのアップロード) + Python(COPYコマンドでのロード)

個人的にはシンプルに実装するなら後者のCOPYコマンドかと思います。 PythonアプリからCOPYコマンドでなくREST APIを使う場合の動機は、 ロードの設定をアプリ側でなくSnowflake(パイプ)側に持たせられるとか、 Snowpipeの ファイルロードメタデータ で再ロード防止したいとかでしょうか…
今回は、PythonでCOPYコマンドを使って実装をしていきます。 REST APIは機会があればブログにしたいと思います。

以下、環境は、Windows10・Python3.7・SnowflakeはSYSADMINロールで作業しています。
サンプルとして使用するExcelファイルは↓のような感じです。
input.xlsx

ロード先のSnowflakeテーブルの作成

前回 はSnowflakeのコンソールからGUIで作業したので、今回はさくっとクエリでやります。

内部ステージ のうち、 テーブルステージ を使用するので、ステージの明示的な作成はしません。 (テーブルの所有者以外がロードするような場合は、名前付きステージの方を使う必要があります。)

前回と同じく、テーブルには連携された全ての回のデータをJSONで蓄積して、ビューで最新回のみ抽出する構成です。

-- ファイル形式名・テーブル名・ビュー名
SET (format_name, table_name, view_name) = ('{ファイル形式名}', '{テーブル名}', '{ビュー名}');

-- ファイル形式
CREATE FILE FORMAT IDENTIFIER($format_name)
TYPE = 'JSON'
COMPRESSION = 'AUTO'
STRIP_OUTER_ARRAY = TRUE;

-- テーブル
CREATE TABLE IDENTIFIER($table_name)
("V" VARIANT NOT NULL)
STAGE_COPY_OPTIONS = (PURGE = TRUE);

-- ビュー
CREATE VIEW IDENTIFIER($view_name)
AS
    SELECT
        V:id_str::VARCHAR AS "ID_STR",
        V:str::VARCHAR AS "STR",
        V:date::TIMESTAMP_NTZ AS "DATE",
        V:int::INT AS "INT",
        V:str_ja::VARCHAR AS "STR_JA"
    FROM TABLE($table_name)
    WHERE
        V:import_datetime = (
            SELECT MAX(V:import_datetime)
            FROM TABLE($table_name)
        )
;

※↑コンテキストは適宜設定されている前提です。

ファイル形式STRIP_OUTER_ARRAY は、

JSON パーサーに外側の括弧(つまり [ ])を削除するよう指示するブール値。

テーブルSTAGE_COPY_OPTIONSPURGE は、

データが正常にロードされた後、ステージからデータファイルを自動的に削除するかどうかを指定するブール値。

IDENTIFIER()TABLE() はセッション変数(2行目)を うまく扱うため に噛ませています。

Snowflakeへアップロードとロードを行うPythonの作成

Pandas でExcelファイル→JSON一時ファイルに変換して、 Python用Snowflakeコネクタ (要インストール)でアップロード(PUT)とロード(COPY)のクエリを発行します。

import sys
import os
import time
import pandas as pd
import snowflake.connector

# Snowflakeの接続先情報
TABLE = "{テーブル}"
FORMAT = "{フォーマット}"
SCHEMA = "{スキーマ}"
DATABASE = "{データベース}"
WAREHOUSE = "{ウェアハウス}"
# Snowflakeの認証情報
ACCOUNT = "{アカウント}"
USER = "{ユーザ}"
PASSWORD = "{パスワード}"

AFFIX = str(round(time.time()))
TMP_JSON_FILE_NAME = "tmp_excel-to-snowflake_{}.json".format(AFFIX)

def main():
    # 実行引数で設定変更できるようにする
    args = sys.argv
    print("param: {}".format(args))
    args_len = len(args)
    target_file_name = args[1] if args_len > 1 else None
    target_sheet_index = int(args[2]) if args_len > 2 else None
    global TABLE, FORMAT, SCHEMA, DATABASE, WAREHOUSE, ACCOUNT, USER, PASSWORD
    TABLE = args[3] if args_len > 3 else TABLE
    FORMAT = args[4] if args_len > 4 else FORMAT
    SCHEMA = args[5] if args_len > 5 else SCHEMA
    DATABASE = args[6] if args_len > 6 else DATABASE
    WAREHOUSE = args[7] if args_len > 7 else WAREHOUSE
    ACCOUNT = args[8] if args_len > 8 else ACCOUNT
    USER = args[9] if args_len > 9 else USER
    PASSWORD = args[10] if args_len > 10 else PASSWORD

    # Pythonファイルのディレクトリで作業する
    os.chdir(os.path.dirname(os.path.abspath(__file__)))

    # JSONファイルの作成
    excel_to_json_file(target_file_name, target_sheet_index)

    try:
        # Snowflakeへのアップロードとロード
        import_to_sf()
    except Exception:
        raise
    finally:
        # JSONファイルの削除
        del_tmp_file()


def excel_to_json_file(target_file_name, target_sheet_index):
    target_file_name = target_file_name \
        or input("Input file name: ").rstrip() \
        or "./input.xlsx"
    target_sheet_index = target_sheet_index or 0
    print("file: {}".format(target_file_name),
          "sheet: {}".format(target_sheet_index), sep="\n")

    # pandasでExcelを読込み
    df = pd.read_excel(target_file_name, sheet_name=target_sheet_index,
                       dtype=str)  # 文字列が数値になるのを避けるためのdtype
    df["import_datetime"] = AFFIX  # ビューで絞り込みするための連携日時
    with open(TMP_JSON_FILE_NAME, "w", encoding="utf-8") as w_file:
        df.to_json(w_file,
                   orient="records", force_ascii=False, date_format="iso")


def del_tmp_file():
    os.remove(TMP_JSON_FILE_NAME)


def import_to_sf():
    input_abspath = os.path.abspath(TMP_JSON_FILE_NAME)

    # Python用Snowflakeコネクタ
    with snowflake.connector.connect(user=USER,
                                     password=PASSWORD,
                                     account=ACCOUNT,
                                     warehouse=WAREHOUSE,
                                     database=DATABASE,
                                     schema=SCHEMA) as conn, \
            conn.cursor() as cur:
        # テーブルステージへアップロード(PUT)
        q_put = "PUT %s %s;"
        param_put = ("file://{}".format(input_abspath), "@%{}".format(TABLE))
        print("query: {}".format(q_put),
              "param: {}".format(param_put), sep="\n")
        cur.execute(q_put, param_put)

        # テーブルへロード(COPY)
        q_copy = "COPY INTO IDENTIFIER(%s) " \
                 "FILE_FORMAT = (FORMAT_NAME = %s) PATTERN = %s;"
        param_copy = (TABLE, FORMAT, ".*{}.*".format(TMP_JSON_FILE_NAME))
        print("query: {}".format(q_copy),
              "param: {}".format(param_copy), sep="\n")
        cur.execute(q_copy, param_copy)


if __name__ == "__main__":
    main()

import_to_sf() 内で、各クエリを発行していますが、 COPYのクエリではテーブルステージを使用しているので、 ステージ名を省略しています。

テーブルの場所にあるファイルからデータをコピーする場合、Snowflakeはテーブルの場所にあるファイルを自動的にチェックするため、 FROM 句を省略できます。

COPYコマンド では PATTERN もしくは FILES で指定しない限り、 ステージ内の他のファイルもロード対象になるようだったので、 意図しないロードを防ぐためファイル指定しました。

動かす

Pythonコード中にSnowflakeの認証・接続先情報を記載するか、 実行時の引数で指定します。

python ./excel-to-snowflake.py ./input.xlsx 0 EXCEL_TABLE
python ./excel-to-snowflake.py {Excelファイル名} {シート番号} {テーブル名} {ファイル形式名} {スキーマ名} {DB名} {ウェアハウス名} {アカウント名} {ユーザ名} {パスワード}

↓データが連携されてビューで見ることができました。

おわりに

内部ステージの種類の使い分けは良く考えて設計したいなと思いました。

作業前に想像していたよりExcelの扱いが楽で、Pandasすごい…

※↓前回のスプレッドシート版はこちら。

Snowpipeを使ってGoogle スプレッドシートからSnowflakeにデータを連携する

参考文献