Amazon AppFlowでS3からSnowflakeへの連携を試してみた

2020.04.29

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

先日、リリースされたフルマネージドな連携サービス「Amazon AppFlow」。このサービスを使えば「Snowflakeも連携できる」ということなので、今回はS3からSnowflakeへの連携を試してみました!

SnowflakeとS3バケットの事前準備

Snowflake側の下準備として、S3に連携できるExternal Stageを準備しておく必要があります。詳しくは以下の記事をご参照ください。この準備でSnowflakeとS3バケットの準備をしておきます。

注意点1:Snowflakeのリージョン

詳しくは後述しますが、もし試される場合には2020/04/28現在においては、Snowflakeの下記 以外 のリージョンを利用する必要がありますのでご注意ください。

  • us-east-2.aws
  • ca-central-1.aws
  • ap-northeast-1.aws

注意点2:Snowflakeユーザのデフォルトロール

ユーザにはデフォルトロールを指定してください。指定したデフォルトロールが後述のAppFlowで利用されるロールとなります。とりあえず試すのであれば、ACCOUNTADMIN ロールが手っ取り早いかと思います。

ALTER USER ユーザ名 SET DEFAULT_ROLE = ACCOUNTADMIN

AppFlowでフローを作成する

Snowflake側の準備ができたら、早速フローを作成していきます。「フローを作成」ボタンをクリックして、開始します。

手順1:フローの詳細を指定

「フロー名」を指定し、今回は他はそのままにして次に進みます。

手順2:フローを設定

「送信元」と「送信先」の設定を行います。なお、2020/04/28現在では設定ミスをしたまま一度フローを保存してしまうと「送信元」と「送信先」は再編集ができないので慎重に設定を行う必要があります。

まずは「送信元」の設定です。「S3」を選択し、バケットには事前に準備したS3バケットを指定します。バケットプレフィックスには、任意のパスを指定してください。

このパスには、上記の事前準備の中でも用意したSnowflakeへロードしたい users.csv をアップロードしておきます。以前に利用したものから、少し内容を変更してIzuageを指定しました。

users.csv

id,name,age
1,Aruto Hiden,22
2,Izu,22
3,Isamu Fuwa,27
4,Yua Yaiba,24
5,Gai Amatsu,45

次に「送信先」の設定です。「Snowflake」を選択し「新規接続を作成」を選びます。

表示されるダイアログでは、次の通り指定していきます。なお、注意点なのですが、うかつにこのダイアログの外側をクリックしてはいけません。ダイアログが閉じられて、設定した値がなかったことになり、設定のやり直しになります。

「S3バケット」と「バケットプレフィックス」は、事前準備で作成したExternal Stage用のS3バケットとパスを指定します。「ステージ名」も同様です。なお、このS3パスの配下は事前に空にしておいてください。

「ウェアハウス」と「アカウント名」は問題ないかと思います。「アカウント名」はSnowflakeのログインURLが https://foobar.us-east-1.snowflakecomputing.com/console/login#/ であれば、foobar ですね。

ここで気を付ける箇所は「リージョン」です。2020/04/28現在では *.awsとなる以下のリージョンは指定するとエラーが発生します。普段ap-northeast-1.awsを使っている私は、ここにハマりました。

  • us-east-2.aws
  • ca-central-1.aws
  • ap-northeast-1.aws

なお、エラーメッセージは以下のようになります。

An error occured while creating the connection {{ profile }}
Invalid inputs: region has an invalid character

ここを越えるとSnowflakeのオブジェクトを指定できるようになります。S3のデータをロードしたいテーブルを指定しましょう。なお、このオブジェクト一覧に表示されるのは、先述の通りSnowflakeユーザのデフォルトロールで参照できるテーブルのみになるので注意が必要です。

ロードしたいテーブルを指定したら、他はデフォルトのままで次にすすみます。

手順3:データフィールドをマッピング

ここではS3に配置したファイルを、Snowflakeテーブルのカラムにどのようにマッピングするかを指定します。今回は手動マッピングを行いました。

「すべてのフィールドを直接マッピングする」を選択して、ダイアログ内でマッピングを指定します。

この時点で、送信元と送信先がきちんと設定できていれば、以下のように設定できるはずです。

うまくマッピングが出来たら、他はそのままにして次にすすみます。

手順4:フィルターを追加

今回は設定しません。次にすすみましょう。

手順5:確認して作成

内容を確認して、問題なければ画面一番右下の「フローを作成」をクリックして完了です!

いざ、実行!

まずは、Snowflake側で「送信先」に指定したテーブルを空にしておきましょう。

USE ROLE accountadmin;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA public;

TRUNCATE TABLE public.users;

SELECT * FROM public.users;

テーブルが空になっていることを確認したら、いよいよフローを動かします。

この時点ではS3バケットは以下のような感じになっています。

foobar (←バケット名)
┗ snowflake
  ┣ flow
  ┃ ┗ users.csv
  ┗ load

作成したフローを開いて「フローを実行」をクリックしてみましょう!

実行が開始され…

正常に終わりました!

Snowflake側で確認します。

USE ROLE accountadmin;
USE DATABASE OOTAKA_SANDBOX_DB;
USE SCHEMA public;

SELECT * FROM public.users;

無事に連携されてますね!

S3バケットの中身がどうなったかを見てみると、以下のようになっていました。

foobar (←バケット名)
┗ snowflake
  ┣ flow
  ┃ ┗ users.csv
  ┗ load
    ┗ cm-ootaka-snowflake-flow
      ┗ 94c2911fe16b8b2acdd590d619baa547

フローで指定したパスに1階層「フロー名」と同じものが出来ており、その下にオブジェクトが配置されていました。中身を見てみます。

94c2911fe16b8b2acdd590d619baa547

{"ID":"1","NAME":"Aruto Hiden","AGE":"22"}
{"ID":"2","NAME":"Izu","AGE":"22"}
{"ID":"3","NAME":"Isamu Fuwa","AGE":"27"}
{"ID":"4","NAME":"Yua Yaiba","AGE":"24"}
{"ID":"5","NAME":"Gai Amatsu","AGE":"45"}

なんと、中身はJSONL形式のファイルでした。このあたりは、SnowflakeのExternal Stageとして設定した以下のCSVファイルのロード設定とは異なっており、気になるところです。もし、理由が分かったら後日に追記したいと思います。

use schema ootaka_sandbox.public;

create or replace file format cm_ootaka_csv_format
  type = 'CSV'
  field_delimiter = ','
  skip_header = 1;

create stage cm_ootaka_ext_stage_s3
  storage_integration = cm_ootaka_ext_stage_s3
  url = 's3://foobar/snowflake/load/'
  file_format = cm_ootaka_csv_format;

おまけ:フロー実行時の失敗談

実は、最初に試した際には「送信元」のCSVファイルは以下のようなファイルでした。(Izuageを指定していない)

users.csv

id,name,age
1,Aruto Hiden,22
2,Izu,
3,Isamu Fuwa,27
4,Yua Yaiba,24
5,Gai Amatsu,45

この場合、なんらかの問題が起きるのか以下のようなエラーとなりました。おそらくcsvの空指定の箇所の処理で問題が起きたと考えられるので、csvの場合は少し注意が必要そうですね。

Failed to execute snowflake copy due to compilation error: Failed to cast variant value "" to FIXED, RequestId: 025313422841-c588545baeef365d6acaf428a6e36408

まとめ

以上、Amazon AppFlowのS3からSnowflakeへの連携のご紹介でした。まだ出たばかりのサービスなので、色々とハマりどころはあったものの、ノンコーディングでSnowflakeへデータ連携ができるサービスが出てきたのは本当に嬉しいです!これからの発展が期待されます。

どなたかのお役に立てば幸いです。それでは!