Amazon DynamoDBのデータをFivetranを使ってSnowflakeに連携する
大阪オフィス所属だが現在は奈良県でリモートワーク中の玉井です。
FivetranはSalesforceやZendeskといったSaaSの半構造化データを、DWH等に構造化データとして連携(自動・定期)できる便利なサービスです。実は、SaaS以外にも連携できるサービスはまだまだあって、今回はAmazon DynamoDBをSnowflakeに連携してみます。
FivetranとDynamoDBの連携について
簡単に説明すると、DynamoDBのデータを、Fivetranを通じて、任意のDBやDWHに連携・同期できるというものです。超簡単に図示するとこうなります。
この連携を行う上で重要な事項としては、DynamoDBの半構造化データであり、それをFivetranは自動で構造化データに変換して各種DWH等に格納してくれる、ということです。JSONファイル等の半構造化データを、行と列で構成された構造化データに変換するのは結構面倒なのですが、Fivetranを使えば、そこについては考えなくてもいいということになります。
昨今のアプリケーションは、バックエンドで持つデータの管理を、RDBではなくDynamoDB等のNoSQLデータベースを使用することも少なくありません。しかし、それらのデータを「分析する」となると、BIツール等を使用することになり、必然的に構造化データへの変換が必要となります。そんな時にFivetranを使うと便利になるかもしれません。
やってみた(セットアップ編)
公式情報
前提条件
- DynamoDBにテーブルとデータが入っている
- 今回はサンプルデータを使用
- サンプルテーブルとデータ - Amazon DynamoDB
- FivetranのDestinationの設定が完了している
- 今回はSnowflakeをセットアップ済
Fivetran側の作業
External IDを確認する
Connectorから何らかのAWSサービス系のものを選んでExternal IDをメモっておきます。
AWS側の作業
Fivetran用のIAMポリシーの作成
マネジメントコンソールにログインして、ポリシーの作成画面に移動します。
下記のポリシーをJSONで設定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:DescribeTable", "dynamodb:GetItem", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "dynamodb:ListTables", "dynamodb:Scan" ], "Resource": [ "*" ] } ] }
名前をつけて作成を完了します。
Fivetran用のIAMロールの作成
続いてロールの作成に入ります。IAMロールの作成画面に移動します。
「別のAWSアカウント」を選び、アカウントIDに834469178297
を入力します(これはFivetran側のAWSアカウントIDなので固定です)。そして下の外部IDに、先程Fivetran側の画面で確認したS3用のExternal IDを入力します。
先程作成したポリシーを選びます。
ロールに名前をつけて作成を完了します(ロールのARNをメモっておきます)。
DynamoDBのストリーム設定
DynamoDBの設定画面に移動し、Fivetranで転送したいテーブルの「ストリームの管理」を選択します。
「新旧イメージ~」を選択し、有効化します。
転送したい全てのテーブルに、これらの設定を行います。
Fivetran側の作業
Connectorsの作成
DynamoDBをConnectorとして登録します。
連携先に作成するスキーマ名、先程作成したロールのARN、DynamoDBのAWSリージョンを入力します。
Sync modeについて
上記に、一点、変わった設定項目として、Sync mode
があります。Sync modeにはUnpacked mode
とPacked mode
の2つがありますが、通常はUnpackedの方で問題ありません。では、Packedはどういう時に選択するのか?ということですが、第1階層にある要素が1000以上存在する場合は、Packedを選びます。
ざっくりいうと「DynamoDBの半構造化データ(のネスト)を展開するかしないか」という感じになります。だから1000以上も要素があるデータをUnpackしちゃうと、連携先のカラム数がドエレーことになっちまうので、Packしたまま転送しましょうねって感じですね。
実際の格納方法の違いは下記に詳しく書いてあるので、そちらをどうぞ。
初回のデータ連携
一通り設定すると接続テストが実施されるので見守ります。
Connectorの設定画面に遷移し、連携するテーブルを選ぶメニューに移動します。
連携するテーブルを選びます。
そして、初回連携を実施します。
データを確認してみた
連携終了後、実際にSnowflake側のデータを確認してみました。
スキーマ
テーブル「ProductCatalog」
Snowflake側
DynamoDB側
テーブル「Reply」
Snowflake側
DynamoDB側
定期的な差分更新もできる
Fivetranを使っている以上、DynamoDBのデータについても定期的な差分更新(自動)が可能となっています。
Fivetranは「DynamoDB Stream Kinesis Adapter」を利用して、差分更新を実現しています。
ですので、新規追加されたデータはもちろん、内容に変更があったデータについても、連携に反映します。
注意点は、DynamoDBストリームの保持時間が24時間だということです。(何らかの原因で)連携の失敗が24時間以上続いた場合(24時間以上成功しなかった場合)、差分更新ではなく完全フル同期を再度行う必要があります。
おわりに
DynamoDBが使われているアプリケーションのデータをDWHに転送してBIツールで分析したい…というケースは結構あると思います。また、Fivetranは他のConnectorもたくさんあるので、Salesforce等のデータも同じDWHに突っ込んで、別々のデータを掛け合わせて新しい分析を行うことも可能です。