この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Golang用のSnowflake Driverを使い、ファイルを読み込んでテーブルステージにアップロードしてみました。 基本的には以下の公式のドキュメント通りなのですが、いくつか躓いたところもあったので、アップロードできたソースをここに残しておきたいと思います。
前提条件や環境について
Snowflakeのユーザやテーブルは作成済みであるものとします。今回は以下の記事の「users.csv」を「users」テーブルのテーブルステージにアップロードしました。予め作成しておいてください。
Snowflakeのテーブルステージ経由でファイルをロードしてみた
Golangの実行環境は、以下の記事のコンテナとしました。「/share/src」フォルダを作業フォルダとしたので、他の環境で行う場合は適宜読み替えてください。
Amazon Linux 2 の Dockerイメージから開発環境を作り Visual Studio Codeで接続してみる
Golangの実行環境ができたら、以下のコマンドでSnowflakeのDriverをインストールします。
$ go get github.com/snowflakedb/gosnowflake
ソースについて
まず最初にソースを貼り付けておきます。
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
sf "github.com/snowflakedb/gosnowflake" // Go Snowflake Driver
)
func main() {
accountname := os.Getenv("accountname")
username := os.Getenv("username")
password := os.Getenv("password")
database := os.Getenv("database")
schema := os.Getenv("schema")
table := "users"
readFilePath := "/share/src/users.csv"
uploadFileName := "upload_users.gz" // 拡張子をつける
parallel := 4
dsn := fmt.Sprintf("%s:%s@%s/%s/%s",
username,
password,
accountname,
database,
schema,
)
db, err := sql.Open("snowflake", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
fileStream, err := os.OpenFile(readFilePath, os.O_RDONLY, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer fileStream.Close()
sql := fmt.Sprintf("put file://%s @%%%s auto_compress=true parallel=%v;", uploadFileName, table, parallel)
_, err = db.ExecContext(sf.WithFileStream(context.Background(), fileStream), sql)
if err != nil {
log.Fatal(err)
}
fmt.Println("upload stream success!!")
}
ユーザ名やパスワードなどについては、今回は環境変数から取得するようにしました。14〜18行目で読み込んでいます。こちらについては適切な値を設定してください。
10行目でSnowflake Driverをimportしています。このインポートしたDriverの「WithFileStream()」を呼び出し、引数にファイルのStreamを渡します。(45行目)
また同じく45行目で「put」コマンドをクエリとして(変数sqlに入れて)渡しています。「put」コマンド自体は以下のような構文となります。
> put file://アップロードファイル名 @%テーブル名 auto_compress=true
このアップロードファイル名ですが、「auto_compress=true」で圧縮してアップロードする場合は圧縮後の拡張子をつけるほうがよさそうです。理由としては、アップロードしたファイルを「copy」コマンドでテーブルに取り込もうとすると、拡張子がないと以下のエラーとなったためです。(なお拡張子をつけない場合でもアップロードされたファイルには「.gz」がつきました。)
>copy into users file_format = (type = csv skip_header = 1);
100079 (22000): Invalid data encountered during decompression for file: 'upload_users.gz',compression type used: 'GZIP', cause: 'data error'
このため、21行目でアップロードファイル名には拡張子をつけています。
ソースを実行し、アップロードされたことをsnowsqlなどで「list」コマンドを実行して確認します。
>list @%users;
+-----------------+------+----------------------------------+-------------------------------+
| name | size | md5 | last_modified |
|-----------------+------+----------------------------------+-------------------------------|
| upload_users.gz | 128 | xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxxxxxxxxxxxxx |
+-----------------+------+----------------------------------+-------------------------------+
最後に
Snowflake DriverでファイルのStreamをテーブルステージにアップロードするサンプル処理を書いてみました。何かの役に立てば幸いです。