Amazon DynamoDBのデータをFivetranを使ってSnowflakeに連携する

ふぇーべてれーんで、でーなもでーべー
2020.08.07

大阪オフィス所属だが現在は奈良県でリモートワーク中の玉井です。

FivetranはSalesforceやZendeskといったSaaSの半構造化データを、DWH等に構造化データとして連携(自動・定期)できる便利なサービスです。実は、SaaS以外にも連携できるサービスはまだまだあって、今回はAmazon DynamoDBをSnowflakeに連携してみます。

FivetranとDynamoDBの連携について

簡単に説明すると、DynamoDBのデータを、Fivetranを通じて、任意のDBやDWHに連携・同期できるというものです。超簡単に図示するとこうなります。

この連携を行う上で重要な事項としては、DynamoDBの半構造化データであり、それをFivetranは自動で構造化データに変換して各種DWH等に格納してくれる、ということです。JSONファイル等の半構造化データを、行と列で構成された構造化データに変換するのは結構面倒なのですが、Fivetranを使えば、そこについては考えなくてもいいということになります。

昨今のアプリケーションは、バックエンドで持つデータの管理を、RDBではなくDynamoDB等のNoSQLデータベースを使用することも少なくありません。しかし、それらのデータを「分析する」となると、BIツール等を使用することになり、必然的に構造化データへの変換が必要となります。そんな時にFivetranを使うと便利になるかもしれません。

やってみた(セットアップ編)

公式情報

前提条件

Fivetran側の作業

External IDを確認する

Connectorから何らかのAWSサービス系のものを選んでExternal IDをメモっておきます。

AWS側の作業

Fivetran用のIAMポリシーの作成

マネジメントコンソールにログインして、ポリシーの作成画面に移動します。

下記のポリシーをJSONで設定します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams",
                "dynamodb:ListTables",
                "dynamodb:Scan"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

名前をつけて作成を完了します。

Fivetran用のIAMロールの作成

続いてロールの作成に入ります。IAMロールの作成画面に移動します。

「別のAWSアカウント」を選び、アカウントIDに834469178297を入力します(これはFivetran側のAWSアカウントIDなので固定です)。そして下の外部IDに、先程Fivetran側の画面で確認したS3用のExternal IDを入力します

先程作成したポリシーを選びます。

ロールに名前をつけて作成を完了します(ロールのARNをメモっておきます)。

DynamoDBのストリーム設定

DynamoDBの設定画面に移動し、Fivetranで転送したいテーブルの「ストリームの管理」を選択します。

「新旧イメージ~」を選択し、有効化します。

転送したい全てのテーブルに、これらの設定を行います。

Fivetran側の作業

Connectorsの作成

DynamoDBをConnectorとして登録します。

連携先に作成するスキーマ名、先程作成したロールのARN、DynamoDBのAWSリージョンを入力します。

Sync modeについて

上記に、一点、変わった設定項目として、Sync modeがあります。Sync modeにはUnpacked modePacked modeの2つがありますが、通常はUnpackedの方で問題ありません。では、Packedはどういう時に選択するのか?ということですが、第1階層にある要素が1000以上存在する場合は、Packedを選びます。

ざっくりいうと「DynamoDBの半構造化データ(のネスト)を展開するかしないか」という感じになります。だから1000以上も要素があるデータをUnpackしちゃうと、連携先のカラム数がドエレーことになっちまうので、Packしたまま転送しましょうねって感じですね。

実際の格納方法の違いは下記に詳しく書いてあるので、そちらをどうぞ。

初回のデータ連携

一通り設定すると接続テストが実施されるので見守ります。

Connectorの設定画面に遷移し、連携するテーブルを選ぶメニューに移動します。

連携するテーブルを選びます。

そして、初回連携を実施します。

データを確認してみた

連携終了後、実際にSnowflake側のデータを確認してみました。

スキーマ

テーブル「ProductCatalog」

Snowflake側

DynamoDB側

テーブル「Reply」

Snowflake側

DynamoDB側

定期的な差分更新もできる

Fivetranを使っている以上、DynamoDBのデータについても定期的な差分更新(自動)が可能となっています。

Fivetranは「DynamoDB Stream Kinesis Adapter」を利用して、差分更新を実現しています。

ですので、新規追加されたデータはもちろん、内容に変更があったデータについても、連携に反映します。

注意点は、DynamoDBストリームの保持時間が24時間だということです。(何らかの原因で)連携の失敗が24時間以上続いた場合(24時間以上成功しなかった場合)、差分更新ではなく完全フル同期を再度行う必要があります。

おわりに

DynamoDBが使われているアプリケーションのデータをDWHに転送してBIツールで分析したい…というケースは結構あると思います。また、Fivetranは他のConnectorもたくさんあるので、Salesforce等のデータも同じDWHに突っ込んで、別々のデータを掛け合わせて新しい分析を行うことも可能です。