自前でデータパイプラインをサクッと構築できる「Airbyte」を試してみた

本記事は、データパイプラインを簡単に構築できるツールAirbyteでロードジョブを試していきます。

本アドベントカレンダーでは、日本でも海外でもマイナー寄りな製品を取り扱ってきてますが、多分一番知名度があるのがこのAirbyteになるかなと思います。

Airbyteについて

Airbyteは2020年にサンフランシスコで創業されたテックカンパニーです。創業からわずか1年でシリーズAに到達し、$26Mの資金調達に成功しているスタートアップで、急激な成長速度で業界から注目を集めています。

Airbyte announces $26M Series A for open-source data connector platform | TechCrunch

その最大の特徴は製品をOSSで公開しているという点でしょう。自身のAWSにデプロイすればAirbyteの機能をそのまま使えてしまう導入障壁が低さから、モダンデータスタック界隈のエンジニアから支持を集めています。最近Hashicorp社が上場して注目を集めたように、OSSを絡めてサービス展開する企業の戦略には目を見張るものがありますね。

対応しているデータソースの数もかなり豊富で、Airbyteを使えば簡単にデータレイクやデータベース間の同期ジョブを実現できます。自社でコンテナの面倒を見たくない方向けにマネージドなCloud版も用意されていますが、2021年12月11日現在ではUSのみ利用可能となっています。

さっそく、Airbyteを試していきましょう!

RDS PostgreSQLからRedshiftへデータをロードする

公式HPからdeploy Airbyte Open-sourceをクリックすると、OSS版のAirbyteのドキュメントに飛びます。

Deploy Airbyte - Airbyte Documentationに従い、以下のコマンドを実行してDockerで環境を立ち上げます。

$ git clone https://github.com/airbytehq/airbyte.git
$ cd airbyte
$ docker-compose up

http://localhost:8000/にアクセスし、メールアドレスをしてContinueをクリックします。

初期設定の画面にランディングします。Set up your first connectionをクリックし、コネクションのセットアップを進めていきます。

今回は、パブリックに配置したRDS PostgreSQLインスタンスに PostgreSQL サンプルのデータベースを用意する | DevelopersIO で用意したサンプルデータを、同じくパブリックのRedshiftクラスタへロードするジョブを作成していきます。まずはSourceとなるRDS PostgreSQLインスタンスの接続情報を入力します。SSH経由での接続も可能みたいなので、プライベートサブネットに配置されていても対象にできそうですね。

次にDestinationとなる、Redshiftクラスタの接続情報を入力していきます。

最後に、Connectionのセットアップを行います。特にスケジュール実行させる予定はないので、Syncの頻度はmanualにしています。

SourceとDestination間で対象とするスキーマやテーブルをマッピングしたり、Sync modeで、追加するか上書きするか調節も可能です。

画面下部のNormalization & Transformationで、データの変換処理も定義できます。こちらは次節で掘り下げていくので、一旦No custom transformationのままでいきます。Set up connectionをクリックして確定。

オンボーディングのガイドは閉じてしまってOKです。

左メニューでConnectionsをクリックすると、先ほど作成したConnectionが表示されています。LaunchをクリックしてSyncを起動させます。

RDS内のデータがローカル端末を経由してRedshiftにインサートされました。全件データを捌けるリソースを必要とするので、Docker用のメモリはある程度確保しておきましょう。今の設定ではDocker Desktopに8GBのメモリを割り当てています。

Redshift側を確認すると、無事RDS側のデータがロードされていることが確認できました。

Airbyteとdbtとの連携仕様を理解する

続いて、先ほどは飛ばしていたNormalization & Transformationを試してみます。こちらはTransformations with SQL (Part 1/3) - Airbyte Documentationにチュートリアルが載っているので、こちらを参考に進めていきます。

AirbyteではSQLベースで変換処理の追加が可能ですが、内部的にはdbtを活用してワークフローを制御しているので、まずは仕様を紐解いていくところから始めていきます。Connectionsの画面でCreate Connectionをクリックし、新規作成を進めていきます。

Sourceとして使用するのは、https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csvに配置されているCovidのサンプルCSVです。

Destinationには、先ほどSourceとして使用していたRDS PostgreSQLを指定します。

Syncの頻度はmanualに、Sync modeはFull refresh | Overwriteにしておきます。Set up connectionをクリックして確定させます。

Connectionsの一覧画面に戻り、Launchをクリックして起動させます。

接続に問題がなければ成功するはずです。今回は注目して欲しい箇所は、ログが出力されている/tmp/workspace/2/0/logs.logというパスです。Dockerコンテナ内の/tmp/workspace/2/0/というディレクトリにリソースが詰まっているみたいです。

以下のコマンドを実行していき、/tmp/workspace/2/0/内のリソースをローカルにコピーしていきます。厳密には、/tmp/workspace/2/0/build/run/airbyte_utils/models/generated/というディレクトリをコピーします。

$ NORMALIZE_WORKSPACE="2/0/"
$ docker cp airbyte-server:/tmp/workspace/${NORMALIZE_WORKSPACE}/build/run/airbyte_utils/models/generated/ models/

ディレクトリにはデータロード時に使用するSQLが入っています。ここに転記するには行数が多いので、興味ある方はTransformations with SQL (Part 1/3) - Airbyte Documentationを参照してください。

$ find models
models
models/airbyte_tables
models/airbyte_tables/public
models/airbyte_tables/public/covid_epidemiology.sql
$ vi models/airbyte_tables/public/covid_epidemiology.sql

次にAirbytesに組み込まれているdbtのプロジェクトを見ていきます。dbt debugコマンドをAirbyteのCLI経由で実行させると、RDSのコネクションの情報が確認できます。

$ docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization debug --profiles-dir=. --project-dir=.
Running with dbt=0.21.1
dbt version: 0.21.1
python version: 3.8.12
python path: /usr/local/bin/python
os info: Linux-5.10.76-linuxkit-x86_64-with-glibc2.2.5
Using profiles.yml file at /data/2/0/normalize/profiles.yml
Using dbt_project.yml file at /data/2/0/normalize/dbt_project.yml

Configuration:
  profiles.yml file [OK found and valid]
  dbt_project.yml file [OK found and valid]

Required dependencies:
 - git [OK found]

Connection:
  host: haruta-demo.XXXXXXXXXXXX.ap-northeast-1.rds.amazonaws.com
  port: 5432
  user: postgres
  database: postgres
  schema: public
  search_path: None
  keepalives_idle: 0
  sslmode: None
  Connection test: [OK connection ok]

接続確認後dbt runを実行すれば、先ほどAirbyteの画面から実行したロードと同じジョブを実行することができます。

$ docker run --rm -i -v airbyte_workspace:/data -w /data/$NORMALIZE_WORKSPACE/normalize --network host --entrypoint /usr/local/bin/dbt airbyte/normalization run --profiles-dir=. --project-dir=.
Running with dbt=0.21.1
Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
Partial parsing enabled, no changes found, skipping parsing
[WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 2 unused configuration paths:
- models.airbyte_utils.generated.airbyte_incremental
- models.airbyte_utils.generated.airbyte_views

Found 4 models, 0 tests, 0 snapshots, 0 analyses, 479 macros, 0 operations, 0 seed files, 1 source, 0 exposures

09:21:53 | Concurrency: 32 threads (target='prod')
09:21:53 | 
09:21:53 | 1 of 1 START table model public.covid_epidemiology........................................................... [RUN]
09:21:53 | 1 of 1 OK created table model public.covid_epidemiology...................................................... [SELECT 17911 in 0.64s]
09:21:54 | 
09:21:54 | Finished running 1 table model in 1.88s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

自前のdbtプロジェクトをAirbyteで実行する

内部構造も理解できたところで、Airbyteの画面からNormalization & Transformationを設定していきます。ConnectionのSettingsを開き、Custom transformationセクションにあるAdd transformationをクリックします。

仕様としては、Gitリポジトリに上がっているdbtプロジェクトをインポートし、エントリポイントを指定して実行できるようになっています。チュートリアルに倣って、dbt公式で用意しているサンプルプロジェクトを指定して、dbt seeddbt rundbt testの3コマンドを実行する変換処理を追加しました。

3つ設定すると画面上ではこんな見た目になります。さっそく実行していきます。

無事完了しました。もちろん、この変換処理ではSourceで設定しているCSVデータに対しては何も変換処理をしていませんが、内部的に外部のdbtプロジェクトが動いていることが確認できます。

PostgreSQLクライアントからDB内を確認すると、新たにサンプルデータが追加されていることが確認できました。

デモの実施は以上です!

所感

Airbyte、かなりクオリティの高いサービスでした。データソースの種類も豊富なので、さまざまサービスやリソース間のデータを単純にロードするケースでは、個人的にも使っていきたいですね。自分でインスタンス立てて気軽に試せる点も◎。

Airbyteで変換処理を追加するためにはdbtの理解も要求されるので、既存のパイプラインツールをAirbyteにリプレースする場合は、変換処理もdbt流に書き換えておく必要がありそうです。まさにモダンデータスタックに移行するぞ!という気概のある方向けの、文字通りモダンなサービスでした。

本アドベントカレンダーでは、今話題のデータ関連SaaSを取り上げていきますので、引き続き乞うご期待ください!