[Fivetran] Aiven for Apache Kafkaを接続してストリーミングデータをDWHに同期してみた

こちらAiven、Fivetranとの接続に成功した。
2022.01.14

大阪オフィスの玉井です。

FivetranはSaaSやDBだけでなく、ストリーミングデータも同期の対象に設定することができます。

というわけで、今回はApache Kafkaを接続してみます。

しかし、ただのKafkaではなく、Aivenというサービスを使用して立ち上げたKafkaを接続してみたいと思います。

Aivenについて

Aivenそのものについて

Aivenを使用してKafkaを立ち上げる系

(接続設定を)やってみた

Aiven側

Kafkaを起動する

AivenがApache Kafka 3.0をサポートしました | DevelopersIOと同様のやり方で、Kafkaを起動します。

証明書関係を一式ダウンロードする

上記記事にも書かれていますが、Kafkaの接続情報(に関するファイル)をダウンロードしておきます(Fivetran側で使用する)。

IPアドレスの設定

初期設定だと全開放されていますが、IP制限したい場合、最低でもFivetran側のIPを指定する必要があります。

FivetranのIPは下記で確認できます。

auto_create_topics_enableをONにする

これは必須設定ではありませんが、後でデータ投入する時に楽になるので、今回はONにします。

Overviewページの最下部にAdvanced configurationという設定があります。そこの赤文字の「Add configuration option」から、kafka.auto_create_topics_enableという設定を出すことができます。出した後は、トグルスイッチを切り替えてON状態にした後、右下の「Save advanced configuration」を押すことで、設定が完全に保存されます(このSave〜を押し忘れがちなので注意)。

Fivetran側

Fivetranの公式ドキュメントに、Kafkaを設定する手順が書かれています。しかし、Aiven for Kafkaの場合、ドキュメントに載っている作業はほとんどスッ飛ばすことができます。

Kafka Connectorを設定する

Fivetranにログインして、Connector画面からKafkaを探して設定します。

接続設定は下記の通り。

重要なのは、最後の証明書関係の部分です。設定は「TLS」にし、Aiven側でダウンロードしておいた各種ファイルをそのままアップロードします。

本来は、これらのファイルはKafka側で手動生成しないといけないのですが(その手順がFivetranのドキュメントに書いてある)、Aiven for Kafkaの場合は、すでにそれらのファイルが用意されているので、それをダウンロードしてFivetranにアップロードするだけで接続できます。楽!!!

ファイル形式は、実際に送信するデータの形式に沿って設定しましょう。

問題無ければ、接続テストが行われて、設定が完了します。

(データの同期を)やってみた

Aiven for KafkaとFivetranをつなぐことができたので、ここからは実際にデータをストリーミングして、実際にDWHに入るかどうかやってみたいと思います。

Kafkaにデータを入れる(送る)

Aivenのドキュメントに、サンプルデータを簡単に送信できるツールが紹介されていたので、それを使います(ピザの注文データ?)。

上記を一式ローカルに持ってきます。

$ gh repo clone aiven/python-fake-data-producer-for-apache-kafka

そのディレクトリに移動して、下記を実行し、必要なPythonライブラリをインストールします。

$ cd python-fake-data-producer-for-apache-kafka/
$ pip3 install -r requirements.txt

後は、ツールを実行して、Kafkaにデータを送ります。このツールは、意図的にキャンセルするまで、ずーっとKafkaにストリーミングデータを送り続けてくれます。

$ python3 main.py --cert-folder (Kafkaの証明書関係3ファイルを入れているディレクトリ) \                                                             --host (Kafkaのホスト名) \                                                                   --port (Kafkaのポート番号) \                                                                          --topic-name pizza-orders \                                                                      --nr-messages 0 \                                                                        --max-waiting-time 00

送信するデータの形式も、上記のGithub(のREAD.md)に記載が載っています。下記にも載せておきます。この半構造化データを、FivetranがDWHのテーブルにどういう風にロードするのか…見ものでございます。

{
  "id": 0,
  "shop": "Circular Pi Pizzeria",
  "name": "Jason Brown",
  "phoneNumber": "(510)290-7469",
  "address": "2701 Samuel Summit Suite 938\nRyanbury, PA 62847",
  "pizzas": [{
    "pizzaName": "Diavola",
    "additionalToppings": []
  }, {
    "pizzaName": "Mari & Monti",
    "additionalToppings": ["olives", "garlic", "anchovies"]
  }, {
    "pizzaName": "Diavola",
    "additionalToppings": ["onion", "anchovies", "mozzarella", "olives"]
  }]
}

Fivetranで同期を行う

上記ツールで、ある程度データを送信したら、Fivetranを使用して、DWHにデータをロードしてみます。

初期接続設定直後、まずSchema設定を実施するようにFivetranから言われるので、それに従います。Schema設定といっても、Fivetran側がKafkaをスキャンして、同期対象を出してくれるので、どのデータを同期するか選ぶだけです。ちなみに、1Topic = 1テーブル となります。

同期したいTopicを選んだら、後はいつも通り、Syncボタンを押して、同期を開始します。

DWHにロードされたデータを確認する

同期が完了しました。

というわけで、DWH側を見てみましょう。

うーん…入っていることは入っているのですが…使いづらい形ですね…。

なぜこのような形で入っているかと言うと、Fivetran側の設定で「JSONファイル」「Packed」としていたからです。文字とおり、ファイルの値をPackしたまま入れるということで、valueという1つのカラムに、データ全部がドバッと入っちゃってます(keyの入り方も微妙…)。

最近のDWHは、こういう半構造化データも扱えるようになってきていますが、どうせなら最初から、ある程度バラされて入ってくる方が嬉しいですよね。というわけで、Fivetran側の設定を「Unpacked」にして、やり直してみます。これも名前の通りですが、JSONファイルの中をバラして、各カラムにした状態でロードしてくれます。

…というわけで、下記がUnpackedでロードしたデータになります。

Packedに比べてだいぶバラけました。ただ、2階層目以降のデータはさすがにそのままです。これ以降は、Transformation等を使って、後からバラしていくしかありませんね。

おわりに

Fivetranのストリーミングデータの扱い(どういう風に構造化データに変換するのか)は、ほとんど予想通りでした。

今回驚いたのは、Aivenで立ち上げたKafkaの場合、ほとんどノー設定で、そのままFivetranに接続できたことです。これは本当に大きいと思います。

「ストリーミングデータを分析したい」「けど、ストリーミング基盤も、それをDWHに入れる手段も無い…」という課題をお持ちの方にはピッタリな組み合わせだと思いました。