Fivetran の SFTP コネクタと AWS Transfer Family を使って Snowflake にデータを同期してみた
はじめに
Fivetran では、ファイル転送用のコネクタとして SFTP コネクタを提供しています。SFTP コネクタを使い AWS Transfer Family を介してファイルを Destination(Snowflake)に同期してみましたので、その際の手順を記事としました。
SFTP コネクタ
SFTP コネクタについては、以下に記載があります。
特徴として、データの削除はキャプチャしないので、データファイルが削除されても Destination 側から対象のレコードが削除されることはありません。
接続方法
サーバーへの接続方法として以下があります。
- 直接接続
- サーバーに対して直接 Fivetran の IP CIDR からの通信を許可する
- SSH トンネルによる接続
- サーバーに直接接続できない場合のオプション
- パブリックアクセス可能な踏み台サーバーを使用
認証方法
- ユーザー名・パスワード
- キーペア
- コネクタセットアップ時に、Fivetran 側で生成されるキーペアを使用
ファイル処理およびロードに使用されるプライマリキー
Destination 側で、変更をキャプチャする際のオプションとして、以下が提供されています。
- Upsert file using file name and line number
- システム列である、ファイル名(
_file
)と行番号(_line
)を使用してレコードを識別 - 同じ名称のデータファイルが追加される(更新される)とファイル名と行番号により、データがアップサートされます
- 新規のデータファイルが追加されると、新しいデータとして同期
- システム列である、ファイル名(
- Append file using file modified time
- ファイル名(
_file
)、行番号(_line
)とファイルの変更時刻(_modified
)によりレコードを識別 - ファイルの完全な履歴を追跡可能なオプション
- ファイル名(
- Upsert file using custom primary key
- 執筆時点でベータ版
- プライマリーキーとなるフィールドを指定可能なオプション
- キーに基づく最新のレコードを常に識別可能
検証環境
ここでは、以下の環境を使用しました。
- Destination
- Snowflake
- データソース
- Amazon S3(AWS Transfer Family 経由)
- 接続方法
- 直接接続
- ファイルの処理方法
- Upsert file using file name and line number
事前準備
SFTP サーバーの作成
ここでは、SFTP でファイルの送受信を行うために、AWS Transfer Family を使用しました。設定手順はこちらの記事を参考に進めました。
サーバー作成時の設定は以下の通りとしました。
- プロトコル
- SFTP
- IDプロバイダーのタイプ
- サービスマネージド
- 構成
- エンドポイントのタイプ:VPC でホスト
- アクセス:インターネット向け
- ドメイン
- Amazon S3
- セキュリティグループ
- 自身の端末から 22番ポートへのインバウンド通信を許可
サンプルデータを追加
SFTPサーバーにユーザーを追加し、データを追加します。はじめに以下のファイルを追加しました。
sample_data_20240411.csv sample_data_20240412.csv sample_data_20240413.csv sample_data_20240414.csv sample_data_20240415.csv
各データファイルには、10件のレコードがありsample_data_20240411.csv
の場合、中身は以下のようになっています。各ファイルそれぞれ [Date] 列にファイル名と対応する日付が入っています。
ID,Value,Category,Date 1,0.515959074,B,2024/4/11 2,1.19238034,C,2024/4/11 3,-1.199963559,B,2024/4/11 4,-0.565743423,B,2024/4/11 5,0.744393164,B,2024/4/11 6,0.177296019,C,2024/4/11 7,-0.631499489,A,2024/4/11 8,0.508977572,B,2024/4/11 9,1.501619026,B,2024/4/11 10,-0.361885569,B,2024/4/11
Fivetran 用のユーザーを追加
後述する手順で、Fivetran から接続する際に使用するユーザーを作成します。ここでは、ユーザーに、以下の IAM Policy に紐づくロールを割り当てました。内容としては、ロード対象とする特定のプレフィックス配下のファイルのみ取得できるような設定です。このため、上記のサンプルデータも<バケット名>/SampleData/
配下に追加しています。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowListingOfUserFolder", "Action": [ "s3:ListBucket", "s3:GetBucketLocation" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::<バケット名>" ] }, { "Sid": "FivetranAccess", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:GetObjectACL" ], "Resource": "arn:aws:s3:::<バケット名>/SampleData/*" } ] }
Fivetran の設定
Destination
データの連携先には Snowflake を使用しました。Snowflake を Destination に設定する方法は、以下の記事をご参照ください。
Connector
コネクタとして SFTP を指定します。
データの作成先となるスキーマと作成されるテーブル名を指定します。
接続情報を指定します。キーペア認証の場合 [Login with keypair?] をオンにします。すると「Public Key」 が生成されるので、この内容を AWS Transfer Family の SFTP サーバーへのユーザー追加時の「SSH パブリックキー」に指定します。
ファイルの処理方法を選択します。
以降に、ファイルパスなどその他の設定項目があります。内容は以下の通りです。
- Folder Path
- ファイルシステム内の特定の部分を指定する際に使用
- 指定のフォルダーとそのサブフォルダー内のファイルが同期対象となります
- プレフィックス指定しない場合、同期するファイルシステム全体が対象となります
- File Pattern
- 同期対象のファイル名パターンを正規表現で指定できます
- フォルダーパスで指定のパス以下に適用されます
- 何も指定しない場合、指定のパス配下のすべてのファイルが同期対象となります
- File Type
- 同期対象のファイルタイプします。以下を選択可能です。
- csv
- log
- json
- jsonl
- infer
- tsv
- avro
- parquet
- xml
- infer を指定するとファイル拡張子から推測されます
- xml の場合 infer を使用せず、タイプを明示的に指定する必要があります
- 同期対象のファイルタイプします。以下を選択可能です。
- Compression
- ファイルの圧縮有無、圧縮方法を以下から選択します
- bz2
- gz
- gzip
- zip
- tar
- tar_bz2
- tar_gz
- uncompressed
- infer
- ファイルの圧縮有無、圧縮方法を以下から選択します
- Error Handling
- 同期時のエラー処理方法を選択します
- skip
- ファイル内の形式が正しくないデータを無視して、有効なデータのみを同期
- fail
- 推奨
- ファイル内で形式が正しくないデータを検出した場合、ファイルを同期しない
- skip
- 同期時のエラー処理方法を選択します
また「Enable Advanced Options」をオンにすると下図の設定項目を追加設定可能です。
ファイルの区切り文字やヘッダの有無をここで指定可能です。
詳細は以下をご参照ください。
SFTP connector by Fivetran | Setup Guide
接続設定は以上になります。この状態から接続テストを実施可能ですが、Fivetran から接続するにあたり、必要に応じてセキュリティグループなどでアクセスを許可します。
How Fivetran connects | Fivetran Fixed IP Addresses
Supported Destinations | Fivetran connector documentation and setup
接続テストを行うと Fivetran がサーバーのホストキーの指紋(fingerprint)を取得したことが通知されましたので、[Confirm] をクリックします。
問題なければ下図の表示となります。
どの程度まで変更をキャプチャするかの確認画面が表示されるので、任意のオプションを選択します。ここでは「Allow all」を選択し [Continue] をクリックします。
以上で設定は完了です。初期同期後、Snowflake 側でもデータを確認できます。
レコード数
>SELECT COUNT(*) FROM FIVETRAN_DATABASE.SFTP.SAMPLEDATA; COUNT(*) 50
同期されたテーブルには、以下のシステム列が追加されます。
- _FILE
- 親ディレクトリを含むファイルの名前
- _LINE
- _file 内の行番号
- _MODIFIED
- ファイルがアップロードされた時間
- _FIVETRA_SYNCED
- Fivetran が最後に行を正常に同期した時刻
また、ここでは特に指定していないので、データファイルの先頭行がカラム名として認識され、Snowflake 側でも使用されています。
カラム名が含まれないデータファイルの場合は、コネクタ設定時の Advanced Options の「Headerless Files」をオンにします。
ファイルの追加
以下のファイルを追加でアップロードしてみます。
sample_data_20240416.csv sample_data_20240417.csv sample_data_20240418.csv
同期後、Snowflakeで確認すると各ファイルのデータが新規レコードとして追加されます。
レコード数
>SELECT COUNT(*) FROM FIVETRAN_DATABASE.SFTP.SAMPLEDATA; COUNT(*) 80
スキーマの異なるファイルを追加:列追加
次に [New_Column] というカラム名からなる既存ファイルにはないカラムとデータを追加したファイルをアップロードしてみます。
同期後、Snowflake で確認すると下図のようになっており、自動的にカラムが追加されていました。既存のレコードの対応するカラムは NULL となります。
レコード数
>SELECT COUNT(*) FROM FIVETRAN_DATABASE.SFTP.SAMPLEDATA; COUNT(*) 90
スキーマの異なるファイルを追加:列削除
既存のテーブルにはある [Category]、[NewColumn] 列のないデータファイルをアップロードしてみます。 同期後、Snowflake で確認すると下図のようになっており、対応するカラムは NULL となっていました。
>SELECT COUNT(*) FROM FIVETRAN_DATABASE.SFTP.SAMPLEDATA; COUNT(*) 100
既存ファイルを変更・上書き保存
次に、すでにアップロード済みのファイルである「sample_data_20240411.csv」の内容を一部変更して再度アップロードしてみます。 同期後、Snowflake で確認すると下図の通り、既存レコードが変更されていることが確認できます。今回の同期方法(Upsert file using file name and line number)の場合、レコードが更新されています。
ファイルの削除
アップロード済みのデータファイルを削除した場合、上述の通り削除の変更はキャプチャされません。Destination側に残り続けます。(論理削除もされません)
別ファイルのアップロード
さいごに既存のスキーマと全く異なるファイルをアップロードしてみます。
この場合、当然エラーとなり、下図の通り、ログでエラーとなったファイル名を確認できました。
さいごに
Fivetran の SFTP コネクタで Snowflake にデータを同期してみました。
データソースがファイルで SFTP を使用する場合も、Fivetran で同期とスキーマ移行もある程度対応してくれます。
こちらの内容が何かの参考になれば幸いです。