AWS LambdaとFivetranを連携して、長州力のツイートをSnowflakeにロードするぞタココラ 井長州力

てめーら!この状態が今何を求めているのかわかるか?。俺はこのクラスメソッドのど真ん中に立ってるんだぞ、今!
2020.08.11

大阪オフィス所属だが現在は奈良県でリモートワーク中の玉井です。

Fivetranは何とAWS Lambdaとも連携することができます。Lambdaと連携するとはどういうことか?という解説と、実際の「やってみた」を併せてお届けします。

※便宜上、この記事ではLambda関数と呼称していますが、実はAzure FunctionsとGoogle Cloud Functionsにも対応しています。

AWS LambdaとFivetranを連携するとは…?

Lambda関数経由で取得したデータを自動連携する

FivetranはSaaS(Salesforceとか)のデータや、各種DBのデータを、1つのDWHに簡単に連携・集約することができるツールです。データパイプラインツールとも呼称されていますが、データを扱うため、連携先は当然ながら、連携元も何かしらのデータ(もしくはデータを持っている何か)となります。

しかし、AWS Lambdaというサービスは、何かデータを持っているサービスではありません。イベントドリブンで任意のコードを実行できるサーバーレスのサービスです。これとFivetranを連携すると、どういうデータが連携できるのでしょうか?

ざっくりいうと、何かしらのデータを取得・変換するLambda関数を、Fivetranに定期的に実行させて、Lambda関数経由で取得したデータを、DWHに格納する…という感じです。

なかなかイメージが湧かないと思うので、記事後半の「やってみた」で設定する全体図のイメージを載せます。

上記の場合ですと、とあるユーザーのツイートを取得するLambda関数を用意し、それをFivetranに実行させます。Lambda関数で取得したツイートデータはそのままFivetranに返り、Fivetranはそのツイートデータを連携先のDWHに合わせて自動連携する…という流れになります。

AWS LambdaとFivetranを連携するユースケース

公式ドキュメントによれば、下記のような場合に役立つとのことです。

  • Fivetranに存在しないAPIから取得できるデータを連携したいとき
    • Twitterはこのケースに当てはまりますね(Twitter AdsというConnectorはある)
  • プライベートAPI(独自のシステム用に作成されたもの等)から取得できるデータを連携したいとき
  • 連携前に特別な処理(データ変換、フィルタリング、匿名化など)が必要なデータを連携したいとき

これらの場合、データを取得する部分だけLambda関数として定義しておけば、あとがFivetranがやってくれるという形となります。

Fivetranと連携するためのLambda関数について

ここまできて気になる方もいらっしゃるかもしれませんが、Fivetranと連携させるLambda関数には、いくつか満たさないといけない仕様があります。簡単にいうと、リクエスト(受け取るイベント)とレスポンスの形がある程度決まっているので、それに沿ったLambda関数を作成する必要があります。

リクエスト

最低でもstatesecretsという2つのパラメータを受け取るようにする必要があります。

state

「前回成功したLambda関数に関するオブジェクト」になります。めっちゃざっくりいうと、前回実行したLambda関数の実行日付になります。これを用いて、増分更新となるようにLambda関数を作成します。

secrets

Lambda関数内でアクセスするAPIの認証情報です。通常、そういう情報は環境変数に設定しますが、Fivetranと連携する場合、認証情報はFivetran側に持たせるので(後述)、リクエストとして受け付ける形になります。

サンプル(※公式ドキュメント記載のもの)

{
    "agent" : "<function_connector_name>/<external_id>/<schema>",
    "state": {
        "cursor": "2018-01-01T00:00:00Z"
    },
    "secrets": {
        "apiToken": "abcdefghijklmnopqrstuvwxyz_0123456789"
    }
}

レスポンス

この場合におけるレスポンスは、すなわちFivetranに渡すレスポンスということになります。

stateinsertdeleteschemahasMoreの5つが必要です(Fivetranはこの5つのパラメータを元に、DWH側にデータを入れます)。

state

Lambda関数を実行した時点のデータ(日付など)になります。Fivetranはこれを使って、次回以降の更新を差分更新として扱います。

insert

DWHに挿入するデータ(カラムとレコード)です。

delete

DWHから削除するデータです。ただし、Fivetranは実際にレコードを削除するのではなく、_fivetran_deletedというカラムの値をtrueに設定し、削除フラグを立てます。

ちなみに、今回の検証でわかったのですが、このdeleteは必須ではありませんでした(無くても動く)。

schema

各エンティティの主キー(連携後の観点でいうと、各テーブルの主キー)を指定します。

hasMore

Fivetranは、この値をTrueとして受け取った場合、「まだレコードがある」と認識し、Lambda関数を再び実行し、さらにデータを取得しにいきます(Falseになるまでデータを取得し続ける。DWHにデータを連携しにいかない)。

大量のデータを一気に取得・ロードしたい場合に有効だと思います。

サンプル(※公式ドキュメント記載のもの)

{
    "state": {
        "transaction": "2018-01-02T00:00:00Z",
        "campaign": "2018-01-02T00:00:01Z"
    },
    "insert": {
        "transaction": [
            {"id":1, "amount": 100},
            {"id":2, "amount": 50}
        ],
        "campaign": [
            {"id":101, "name": "Christmas"},
            {"id":102, "name": "New Year"}
        ]
    },
    "delete": {
        "transaction": [
            {"id":3},
            {"id":4}
        ],
        "campaign": [
            {"id":103},
            {"id":104}
        ]
    },
    "schema" : {
        "transaction": {
            "primary_key": ["id"]
        },
        "campaign": {
            "primary_key": ["id"]
        }
    },
    "hasMore" : true
}

やってみた(セットアップ編)

公式情報

前提条件

  • Twitter APIの申請が完了している(キーとシークレットがある)
  • Lambda関数の作成とテストが完了している
  • FivetranのDestinationの設定が完了している
    • 今回はSnowflakeをセットアップ済

Fivetran側の作業

External IDを確認する

Connectorから何らかのAWSサービス系のものを選んでExternal IDをメモっておきます。

AWS側の作業(ポリシーとロール編)

Fivetran用のIAMポリシーの作成

IAMポリシーの作成画面に移動し、下記のポリシーを設定します(JSONタブに貼り付けて作成)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "InvokePermission",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": "*"
        }
    ]
}

Fivetran用のIAMロールの作成

IAMロールの作成画面に移動します。

「別のAWSアカウント」を選び、アカウントIDに834469178297を入力します(これはFivetran側のAWSアカウントIDなので固定です)。そして下の外部IDに、先程Fivetran側の画面で確認したExternal IDを入力します

先程作成したポリシーを設定し、名前をつけてロールの作成を完了します。

続いて「信頼関係」の設定をします(Lambdaでこのロールを使用するため)。作成したロールの編集画面から「信頼関係」のタブを選び、「信頼関係の編集」を選択します。

下記を貼り付けます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::834469178297:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<FivetranのExternal ID>"
        }
      }
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

下記にようになっていればOKです。

AWS側の作業(AWS Lambda編)

コード(Lambda関数)の準備

Fivetran社(?)が下記のサンプルを用意しています。

上記のリポジトリの中にGoogle Cloud Function用のTwitterのツイートを取得するサンプルコード(Node.js)があります。こちらを用いたウェビナー動画もあり、動画中に登壇者が「このコードはそのままAWS Lambdaにも使えるぜ」みたいなことを言っているのですが、案の定そのままでは動きませんでした

というわけで、AWS Lambdaで動くようにほんのちょっとだけ改修&コメントを日本語化したものを用意しました。下記になります(中身の処理は普通のJSなので、イベントを受け取るところとレスポンスを返すところだけ変えました)。

const https = require('https');

exports.handler = (request, context, callback) => {
    // Twitter APIのトークンを取得する
    console.log('Getting access token from twitter...')
    let post = https.request({
        method: 'POST',
        hostname: 'api.twitter.com',
        path: '/oauth2/token',
        headers: {
            'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
            'Content-Length': 'grant_type=client_credentials'.length,
            'Authorization': 'Basic ' + Buffer.from(`${request.secrets.consumerKey}:${request.secrets.consumerSecret}`, 'utf8').toString('base64')
        }
    }, postRes => {
        var reply = ''
        postRes.setEncoding('utf8')
        postRes.on('data', chunk => reply += chunk)
        postRes.on('end', () => withAuth(JSON.parse(reply)))
    })
    post.on('error', e => console.log(`Problem with request ${e}`))
    post.write('grant_type=client_credentials')
    post.end()

    // トークンを取得したら、それを使って長州力のタイムラインをリクエストする
    function withAuth(auth) {
        console.log('...got access token from twitter, fetching tweets...')
        var path = '/1.1/statuses/user_timeline.json?screen_name=rikichannel1203'
        // これが初回リクエストではない場合は、リクエストパスに since_id={since_id} を追加する
        if (request.state.since_id != null) {
            console.log(`...getting new tweets since id ${request.state.since_id}...`)
            path += '&since_id=' + request.state.since_id
        }
        // 直近のツイートを取得する
        let get = https.get({
            hostname: 'api.twitter.com',
            path: path,
            headers: {
                'Authorization': 'Bearer ' + auth.access_token
            }
        }, getRes => {
            var reply = ''

            getRes.on('data', chunk => reply += chunk)
            getRes.on('end', () => withTweets(JSON.parse(reply)))
        })
    }

    // 長州力のタイムラインを取得したら、それを使ってDWHにデータをアップサートする
    function withTweets(timeline) {
        console.log(`...got ${timeline.length} tweets, sending to Fivetran.`)
        // 最新のIDを記録しておくことで、今後の更新を増分更新にする
        let since_id = null
        // Twitter APIからのレスポンスをフラットテーブルの形に再フォーマットする
        let tweets = []
        let users = []
        for (let t of timeline) {
            // 最初のIDを記録する
            if (since_id == null) {
                since_id = t.id
            }
            // すべてのツイートを追加する
            tweets.push({
                id: t.id,
                user_id: t.user.id,
                text: t.text
            })
            // ユーザー情報を追加する
            // 重複処理はFivetran側で処理してくれるので、ここで考える必要はない
            users.push({
                id: t.user.id,
                name: t.user.name,
                screen_name: t.user.screen_name
            })
        }
        // JSONのレスポンスをFivetranに送信する
        let resulut = ({
            // 最新のIDを記録しているため、リクエストは増分更新となる
            state: {
                since_id: since_id == null ? request.state.since_id : since_id
            },
            // Fivetranはこれらの主キーを使用してマージを実行する
            // そのため、同じレコードを2回送信しても、DWHには1つ分しか送られない
            schema : {
                tweets : {
                    primary_key : ['id']
                },
                users: {
                    primary_key: ['id']
                }
            },
            // 下記のレコードをDWHに挿入する
            insert: {
                tweets: tweets,
                users: users
            },
            // これがtrueの場合、Fivetranは再び関数を実行し、再度データを取得する
            // 大量のデータ取得に役立つ
            hasMore : false
        });

        callback(null, resulut);
    }
}

関数を作成する

コードが用意できたところで、マネジメントコンソールからLambda関数を作成していきます。

「一から作成」を選び、ランタイムはNode.js 12.x、ロールは先程作成したロールを指定します。

関数を記述してテストする

関数を作成したら、上記で用意したコードを貼り付けます(普通はローカル環境で開発して、諸々Zipで固めてアップロードしますが、今回はちょっとした検証なので、そのまま画面にペーストします)。

コードを保存後、テストします。テストで投げるイベントは下記の通り。前述した仕様のリクエストに沿ってますね。Twitter APIの認証情報を投げます。テストなのでstateは空でOKです(初回実行の体)。

成功しました。ちゃんと長州力氏のツイートがGETできてます。

Fivetran側の作業

Connectorsの作成

AWS LambdaをConnectorとして登録します。

Connectorの詳細設定を行います。Role ARNは先程作成したロールを、Lambda Functionには今回作成したLambda関数のARNを記述します。

Secretsですが、ざっくりいうと、ここにはFivetranからLambda関数に投げるイベントを設定します。今回のLambda関数はTwitter APIの認証情報をイベントとして受け取ると実行されるようになっているので、Secretsには下記を設定します(テストで使用したイベントの一部を流用すればよい)。

{
    "consumerKey": "<Twitter APIのアクセスキー>",
    "consumerSecret": "<Twitter APIのアクセスシークレット>"
}

初回のデータ連携

一通り設定すると接続テストが実施されるので見守ります。

初回連携を実施するようメッセージが出てくるので、実行します。

データを確認してみた

連携終了後、実際にSnowflake側のデータを確認してみました。

スキーマはご覧の通り。

ちゃんとツイートとツイートしたユーザーのデータが構造化データとして入っています。

今回は検証用に試してみただけなので、データの確認後は定期連携を切りましたが、ONのままにしていると、定期的にDWHに長州力氏のツイートが格納されるようになります

おわりに

AWS Lambdaを使うと、連携できるデータの幅がめちゃくちゃ広がると思います。