Amazon Kinesis Data FirehoseからS3にロードされたデータをFivetranを使ってSnowflakeに連携する

ファイホをファイトラでアナリティクる
2020.08.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

奈良県でリモートワーク中の玉井です。

FivetranはSalesforceやZendeskといったSaaSの半構造化データを、DWH等に構造化データとして自動変換・連携できる便利なサービスです。実は、SaaS以外にも連携できるサービスはまだまだあって、今回はAmazon Kinesis Data FirehoseでS3にロードしたデータをSnowflakeに連携してみます。

FivetranとFirehoseの連携について

何らかのアプリケーションで発生したデータを都度処理する…という要件は非常に多いと思います。で、それらのデータをDWHに溜めて分析したいという要件も、これまた多いです。で、そういう要件にあった基盤の構成は色々あり、AWSを使う場合は、Kinesisファミリー(?)を使った構成が多いと思います。

その中の選択肢の1つとして、Fivetranを使うというものがあります。下記のようなイメージです。

本記事では上記(のDWHに入れるまで)を実際にやってみました。

やってみた(セットアップ編)

公式情報

前提条件

  • Amazon Kinesis Data Firehoseの配信ストリームの作成が完了している
    • S3にストリーミングデータが格納される状態
  • S3に格納されるデータの形式がJSONである
    • FivetranとFirehoseを連携させる条件の1つ
    • 今回はFirehoseのサンプルデータを使用
  • FivetranのDestinationの設定が完了している
    • 今回はSnowflakeをセットアップ済

Fivetran側の作業

External IDを確認する

まずConnectorから(Kinesisではなく)「S3」のメニューを開きます。

設定の画面に遷移しますが、実際にS3をConnectorとして登録する必要はありません。ここでの目的はExternal IDの確認です。後々使用するので、メモったらこの画面は閉じます。

AWS側の作業

基本的にはFivetranがS3(Kinesisからのデータが格納されるバケット)に対してアクセスできるように設定するのがメインです。

Fivetran用のIAMポリシーの作成

マネジメントコンソールにログインして、ポリシーの作成画面に移動します。

下記のポリシーをJSONタブにペーストします(バケット名は自分の環境に合わせます)。

{
"Version": "2012-10-17",
"Statement": [
    {
      "Effect": "Allow",
      "Action": [
"s3:Get*",
"s3:List*"
      ],
      "Resource": "arn:aws:s3:::{your-bucket-name}/*"
    },
    {
      "Effect": "Allow",
      "Action": [
"s3:Get*",
"s3:List*"
      ],
      "Resource": "arn:aws:s3:::{your-bucket-name}"
    }
  ]
}

ポリシーの名前を入力して、作成を完了します。

Fivetran用のIAMロールの作成

続いてロールの作成に入ります。IAMロールの作成画面に移動します。

「別のAWSアカウント」を選び、アカウントIDに834469178297を入力します(これはFivetran側のAWSアカウントIDなので固定です)。そして下の外部IDに、先程Fivetran側の画面で確認したS3用のExternal IDを入力します

さっき作成したIAMポリシーを選びます。

ロールの名前を入れて、作成を完了します。

後で使用するので、ロールのARNをメモっておきます。

Fivetran側の作業

Connectorsの作成

Firehose(Fivetran上ではKinesisという名称)をConnectorとして登録します。

スキーマ名とテーブル名を設定します(自動的にSnowflake側に作成される)。Bucketは、Firehoseがデータを格納するS3バケットの名前を入れます。Role ARNは先程メモっておいたIAMロールのARNを入力します。

すると、接続に問題がないかどうかテストが開始されます。もしエラーが出たら設定に誤りがないか確認しましょう。問題なければ次の画面に進みます。

初回のデータ連携

自動的に初回のデータ連携が開始されます。この際、スキーマとテーブルも作成されます。いったんSnowflake側を確認してみましょう。

しっかりスキーマとテーブルが作成されていました。事前にテストでデータをストリーミングしており、その分が初回連携で既に格納されていました。

やってみた(データ連携編)

テストデータのストリーミングを実行する

Firehoseのテスト用ストリームを数分程度実行します。

Fivetranの連携を実行してみる

テスト用ストリームの実行前は1180件でした。

そして、Fivetran側でConnectorのSyncを手動実行します。

実行後は1420件になってました。

データ連携の頻度は設定で変更可能

S3にストリームされて溜まってきたデータを、どのくらいの頻度で連携させるかは、Connectorの設定で変更することができます。

データ型がJSONじゃない場合

前提条件にも書いたように、この連携ではストリームされるデータの形式がJSONである必要があります。もしJSON以外の形式でデータがストリーミングされる場合、AWS Lambdaを使用してJSONに変換してからS3に格納する必要があります。

それに関しては、下記に詳細が書いてあります。

おわりに

アプリ上のイベント等、ストリーミングデータをDWHに貯めて分析するケースは結構あると思うので、そういう時もFivetranは便利ですね。