Golang用のSnowflake DriverでファイルStreamをテーブルステージにアップロードしてみる

2021.09.16

Golang用のSnowflake Driverを使い、ファイルを読み込んでテーブルステージにアップロードしてみました。 基本的には以下の公式のドキュメント通りなのですが、いくつか躓いたところもあったので、アップロードできたソースをここに残しておきたいと思います。

gosnowflake - README

前提条件や環境について

Snowflakeのユーザやテーブルは作成済みであるものとします。今回は以下の記事の「users.csv」を「users」テーブルのテーブルステージにアップロードしました。予め作成しておいてください。

Snowflakeのテーブルステージ経由でファイルをロードしてみた

Golangの実行環境は、以下の記事のコンテナとしました。「/share/src」フォルダを作業フォルダとしたので、他の環境で行う場合は適宜読み替えてください。

Amazon Linux 2 の Dockerイメージから開発環境を作り Visual Studio Codeで接続してみる

Golangの実行環境ができたら、以下のコマンドでSnowflakeのDriverをインストールします。

$ go get github.com/snowflakedb/gosnowflake

ソースについて

まず最初にソースを貼り付けておきます。

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"os"

	sf "github.com/snowflakedb/gosnowflake" // Go Snowflake Driver
)

func main() {
	accountname := os.Getenv("accountname")
	username := os.Getenv("username")
	password := os.Getenv("password")
	database := os.Getenv("database")
	schema := os.Getenv("schema")
	table := "users"
	readFilePath := "/share/src/users.csv"
	uploadFileName := "upload_users.gz" // 拡張子をつける
	parallel := 4

	dsn := fmt.Sprintf("%s:%s@%s/%s/%s",
		username,
		password,
		accountname,
		database,
		schema,
	)

	db, err := sql.Open("snowflake", dsn)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	fileStream, err := os.OpenFile(readFilePath, os.O_RDONLY, os.ModePerm)
	if err != nil {
		log.Fatal(err)
	}
	defer fileStream.Close()

	sql := fmt.Sprintf("put file://%s @%%%s auto_compress=true parallel=%v;", uploadFileName, table, parallel)
	_, err = db.ExecContext(sf.WithFileStream(context.Background(), fileStream), sql)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("upload stream success!!")
}

ユーザ名やパスワードなどについては、今回は環境変数から取得するようにしました。14〜18行目で読み込んでいます。こちらについては適切な値を設定してください。

10行目でSnowflake Driverをimportしています。このインポートしたDriverの「WithFileStream()」を呼び出し、引数にファイルのStreamを渡します。(45行目)

また同じく45行目で「put」コマンドをクエリとして(変数sqlに入れて)渡しています。「put」コマンド自体は以下のような構文となります。

> put file://アップロードファイル名 @%テーブル名 auto_compress=true

このアップロードファイル名ですが、「auto_compress=true」で圧縮してアップロードする場合は圧縮後の拡張子をつけるほうがよさそうです。理由としては、アップロードしたファイルを「copy」コマンドでテーブルに取り込もうとすると、拡張子がないと以下のエラーとなったためです。(なお拡張子をつけない場合でもアップロードされたファイルには「.gz」がつきました。)

>copy into users file_format = (type = csv skip_header = 1);
100079 (22000): Invalid data encountered during decompression for file: 'upload_users.gz',compression type used: 'GZIP', cause: 'data error'

このため、21行目でアップロードファイル名には拡張子をつけています。

ソースを実行し、アップロードされたことをsnowsqlなどで「list」コマンドを実行して確認します。

>list @%users;
+-----------------+------+----------------------------------+-------------------------------+
| name            | size | md5                              | last_modified                 |
|-----------------+------+----------------------------------+-------------------------------|
| upload_users.gz |  128 | xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxxxxxxxxxxxxx |
+-----------------+------+----------------------------------+-------------------------------+

最後に

Snowflake DriverでファイルのStreamをテーブルステージにアップロードするサンプル処理を書いてみました。何かの役に立てば幸いです。

参考サイト