Snowpipeを使ってGoogle スプレッドシートからSnowflakeにデータを連携する

SnowflakeのSnowpipeによる継続的なロード機能と Google Apps Scriptを利用して、Google スプレッドシートのデータを連携する。
2020.06.25

データアナリティクス事業本部、池田です。
データウェアハウスにGoogle スプレッドシートやExcelからちょっとしたデータを入れたい時ってありますよね。 テストデータにするとかマスタと結合したいとか。(無いかな?BIツール側で結合してしまえば済む話?) とりあえず私はたまにあるのでスプレッドシートからSnowflakeにデータを連携してみることにしました。

最終的にできたこと

スプレッドシートから連携用のボタンを押して、

Snowflakeに格納されます。

だいぶ長いので、とりあえず パイプの作成GASの作成 の辺りだけ眺めると、分かった気になれるかと思います。

Snowflake Communityの内容をやってみる

まずはコミュニティの こちらの記事 の内容をやってみました。 (結果として、私の知識と弊社環境だと少ししんどかったです。)

本題ではないので、以下やったことをざっくりです。

  • まずは Googleのサービス アカウント を準備するところまで。

    1. Google Cloud Platform(GCP)のコンソールでプロジェクトを作成
    2. 「APIとサービス」ページにて「Google Sheets API」を有効化
    3. 有効化した「Google Sheets API」のページから「認証情報」を「サービス アカウント」に対して作成
      ここでサービス アカウントも同時に作成(ロールは設定無しでも動作はしました)
    4. 作成したサービス アカウントに対して、Snowflakeに連携したいスプレッドシートを共有
      弊社のG Suite環境下ではサービス アカウント(メールのドメインが異なる)に対してスプレッドシートを共有できなかったので、 G Suite外のアカウントで作成したスプレッドシートを使いました。
    5. 作成したサービス アカウントの詳細ページにて、キーを作成し、JSON形式でダウンロード
  • ここからは 先述の記事内のリンク先GitHub の内容に沿っての作業です。 ( ライセンス情報

    1. db.json にSnowflakeの認証&接続先情報を記入し、 create_table_from_sheet.py と同じフォルダに配置
      • (スキーマまでは必要でしたが、テーブル作成は不要でした)
    2. 前述のサービス アカウントのキー情報を service-account.json にリネームし、前項のフォルダに配置 (もしくは実行引数でキー情報のファイル名を指定するのでも良さそう)
    3. Pythonモジュールのインストール
      (snowflake-connector-python python-dateutil pygsheets)
    4. GitHubから取得したPythonファイルを実行
      python create_table_from_sheet.py --schema {作成済みスキーマ名} --table {作成するテーブル名} --sheet {スプレッドシートID}

ちなみに、記事に記載があった通り、

this tool replaces the entire table every time

毎回テーブルは再作成 されていました。 Pythonのコード読むと CREATE OR REPLACE TABLE してから INSERT INTO のSQLを Pythonのコネクタ で実行しているみたいです。

ちょっとサービス アカウント周りがしんどかったので、 SnowpipeGoogle Apps Script(以下GAS)辺りでなんとかならないかと思ったのが、次章以降の動機です。

Snowpipeを使ってみる

先にSnowflakeの Snowpipe という機能を使って、 ストレージ(今回はAmazon S3)に配置されたファイルが自動的にSnowflakeのテーブルに格納 されるようにします。

SnowpipeはSnowflakeの継続的なデータインジェスションサービスです。

へー。

次節からは例として、Snowflake上の DEVIO というDBの PUBLIC スキーマを使い、 作業は全て SYSADMIN ロールで行っています。

(以降に記載するクエリも上記のようにコンテキストが設定されている前提で、DB名などを省略しています。)

↓使用するS3はこんな感じ。 ※イベントは後ほど設定。

↓使用するテストファイルはこんな感じ。
s3://{バケット名}/snowflake/input/test.json

[
    {
        "import_datetime": "2020-06-24 11:11:11",
        "id": "1",
        "code": "abc",
        "date": "03/01/2019",
        "cost": 100
    },
    {
        "import_datetime": "2020-06-24 11:11:11",
        "id": "2",
        "code": "xyz",
        "date": "04/01/2019",
        "cost": 200
    }
]

データは冒頭の検証で使っていたものに、1列目に日時(用途は後述)を加えてJSONにしたものです。
TSVでなくJSONにしたのは、スプレッドシート側のカラム追加に対応しやすそうな気がしたからです。たぶん。 日付の形式がちょっとあれですが、きっとSnowflakeがあとでうまくやってくれます。たぶん。

ステージの作成

DB内にステージを作成します。ロード元の設定みたいなやつです。
今回はS3を 外部ステージ として設定します。

既存のS3を選択して作成を始めます。

ファイルの配置先と AWSに接続するための情報 を入力します。 (例として、ステージ名は SHEET_EXTERNAL_STAGE としています。)

↓ちなみにクエリで作るとこんな感じ。

CREATE STAGE {ステージ名}
URL = 's3://{バケット名}/snowflake/input/'
CREDENTIALS = (AWS_KEY_ID = '{Access key ID}' AWS_SECRET_KEY = '{Secret access key}');

LIST @{ステージ名}; のようなクエリを発行して前述のファイルを確認できればうまく接続できています。

ファイル形式の作成

ファイルの形式 を作成します。先ほどのステージ上のファイルの形式の設定です。 (例として、ファイル形式名は SHEET_FORMAT としています。)

「外部配列を削除」は、

ルートブラケット [ ] を削除するように JSON パーサーに指示します。

だそうです。

↓ちなみにクエリで作るとこんな感じ。

CREATE FILE FORMAT {ファイル形式名}
TYPE = 'JSON'
COMPRESSION = 'AUTO'
STRIP_OUTER_ARRAY = TRUE;

↓のクエリで先述のファイルの中身が複数レコードに分かれて見れたら良さげです。

SELECT *
FROM @SHEET_EXTERNAL_STAGE (file_format => SHEET_FORMAT)
LIMIT 10;

テーブルの作成

はい作ります。 (例として、テーブル名は IMPORT_BY_SNOWPIPE としています。)

Snowflakeで見かける VARIANT 型ですね。

↓ちなみにクエリで作るとこんな感じ。

CREATE TABLE "{テーブル名}"
("V" VARIANT NOT NULL);

パイプの作成

ようやく本体っぽいところです。

パイプ名とスキーマを指定します。(例として、パイプ名は SHEET_PIPE としています。)

ここまでに作成したステージとファイル形式を指定します。 今回はファイル配置ですぐ取込みたいので、「自動取込みを有効化」しておきます。

で、最後に格納先として作成したテーブルを選択します。

↓ちなみにクエリで作るとこんな感じ。

CREATE PIPE {パイプ名}
AUTO_INGEST = TRUE
AS
    COPY INTO {DB名}.{スキーマ名}.{テーブル名}
    FROM @{DB名}.{スキーマ名}.{ステージ名}
    FILE_FORMAT = (FORMAT_NAME = {DB名}.{スキーマ名}.{ファイル形式名});

S3イベントの設定

この辺のドキュメント を参考に、自動で取込みが行われるようにS3のイベントを設定していきます。

設定の前にどのAmazon SQSに通知するのか、ARNをコピーしておきます。

AWSのコンソールから対象のS3に対していい感じに設定します。

Name :イベント通知の名前(例: Auto-ingest Snowflake)。
Events : ObjectCreate (All) オプションを選択します。
Send to :ドロップダウンリストから SQS Queue を選択します。
SQS :ドロップダウンリストから Add SQS queue ARN を選択します。
SQS queue ARN : SHOW PIPES 出力から SQS キュー名を貼り付けます。

動かす

私の場合は既に配置済みだったファイルが、Snowpipeによって取り込まれていました。 (この辺の挙動はイベントの設定方法やタイミングによるかもしれません…)

別名で中身も変更したファイルをS3へ配置してみました。

取り込まれました。

体感だと、ファイル配置から数十秒でクエリの結果に反映された気がします。たぶん。

うまくいかない時は、 トラブルシューティング にある通り、
SELECT SYSTEM$PIPE_STATUS('{パイプ名}'); でイベントが届いているか調べたり、
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(TABLE_NAME=>'{テーブル名}', START_TIME=> DATEADD(HOURS, -2, CURRENT_TIMESTAMP()))); でCOPYが動いているか調べたりするとなんとかなりそうです。

ファイルを同名で上書きをした場合どうなるか試したのですが、 ドキュメントに回答 がありました。

14日以内に変更され、再度ステージングされたファイル: Snowpipeは、再度ステージングされた変更済みファイルを無視します。

14日後に変更され、再びステージングされたファイル: Snowpipeはデータを再度ロードするため、ターゲットテーブルにレコードが重複する可能性があります。

へー。

GASでS3へJSONファイルをアップロードする

前章のSnowpipeによって、S3にファイルが置けさえすれば取り込まれるようにはなりました。 あとは、なんとかしてスプレッドシートの内容をS3に置くだけです。

スプレッドシートの作成

↓こんな感じです。

import_datetime 列は =TEXT(NOW(),"yyyy-MM-dd HH:mm:ss") を設定していて、
iddate 列も文字列として作成しています。

GASの作成

既に弊社ブログに スプレッドシート→S3(JSON)の記事 がありました。 基本的にその内容をやるだけなのですが、2点変更しています。

  1. 配置ファイル名の変更
    →前述のブログだと、再出力の際はS3上のファイルは同名で上書きをしているので、 そうならないようにUnix timeをファイル名にするよう変更しました。

    var filePath = props.getProperty("FILE_PATH") + String(Math.floor( new Date() / 1000 )) + ".json";
  2. スプレッドシートへメニューの追加
    →スクリプトエディタでなく、スプレッドシートの画面から起動できるようにしました。

    function onOpen() {
      var ui = SpreadsheetApp.getUi();
      var menu = ui.createMenu("Snowflake連携");
      menu.addItem("実行","uploadS3");
      menu.addToUi();
    }


動かす

追加されているメニューから、GASを実行します。

Snowpipeが動いてくれて、Snowflake上のテーブルから確認できました。

ビューの作成

最後にJSONを分解してVIEWにします。

CREATE VIEW SHEET_VIEW
AS
    SELECT
        V:id::VARCHAR AS "ID",
        V:code::VARCHAR AS "CODE",
        NULLIF(V:date, '')::TIMESTAMP_NTZ AS "DATE",
        NULLIF(V:cost, '')::INT AS "COST"
    FROM IMPORT_BY_SNOWPIPE
    WHERE
        V:import_datetime = (
            SELECT MAX(V:import_datetime)
            FROM IMPORT_BY_SNOWPIPE
        )
;

データは全てではなく、最後にスプレッドシートから連携されたものだけになるように 追加しておいた日時を利用して WHERE で調整しています。

※スプレッドシートの内容を変更せずに連携を繰り返すと、 関数の再計算が行われないために正しくビューに表示できないことがあります。

おわりに

Snowpipeはいろいろ便利な使い道がありそうです。 GUIでもクエリでも組み立てられるので、楽ですね。

(本当は… Snowflakeの 内部ステージ を使うことで、S3を介さなくても連携できるようにしたかったのですが…… GASからどうやってファイルを送るかが思いつかず………)

※↓Excel版もブログにしました。

PythonでSnowflakeへExcelからデータを連携する

参考文献