EC2 にインストールした Airbyte でデータ連携を試してみた

2024.04.19

はじめに

Airbyte はデータパイプライン構築時に使用できるオープンソースの ELT ツールとして提供されています。

本記事ではクイックスタートのようなイメージで、オープンソース版の Airbyte を EC2 にインストールし、データの統合を行ってみます。
インストール手順は以下にまとまっているので、こちらを参照しました。

なお、上述の通りクイックスタート的な意味合いが強く各設定の細かな点までは触れていないためご注意ください。

前提条件

ここでは、以下の構成で検証しました。

  • デプロイタイプ
    • Airbyte on AWS EC2
  • インスタンス
    • t2.medium
    • OS: Amazon Linux 2
    • ストレージ:20 GB
  • 構成
    • Airbyte 用の EC2 はパブリックサブネットに構築
  • Data Source
    • Amazon RDS for PostgreSQL:16.1-R2
      • インスタンスタイプ:db.t3.micro
    • プライベートサブネットに構築
  • Destination
    • Snowflake

Airbyte のインストール

EC2 に SSH接続後、以下のコマンドを実行し、Dcoker サービスをインストールし、起動します。

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo usermod -a -G docker $USER

続けて、Docker Compose のプラグインをインストールするコマンドがありますが、このままではエラーとなってしまいました。

sudo yum install -y docker-compose-plugin
docker compose version

ドキュメントにも記載がある通り、ここでは以下の手順を参照しながらインストールを行いました。

指定のディレクトリを作成

sudo mkdir -p /usr/local/lib/docker/cli-plugins
cd /usr/local/lib/docker/cli-plugins

リリースページから該当のファイルをダウンロード

sudo curl -OL https://github.com/docker/compose/releases/download/v2.26.1/docker-compose-linux-x86_64

名称変更と権限の付与

sudo mv docker-compose-linux-x86_64 docker-compose
sudo chmod +x docker-compose

確認

$ docker compose version
Docker Compose version v2.26.1

再度、インスタンスに SSH 接続後、以下のコマンドでインストールを行います。

mkdir airbyte && cd airbyte
wget https://raw.githubusercontent.com/airbytehq/airbyte/master/run-ab-platform.sh
chmod +x run-ab-platform.sh
./run-ab-platform.sh -b

Airbyte への接続

ローカルで以下のコマンドを実行し SSH トンネルを作成します。

SSH_KEY=<キーファイル>.pem 
INSTANCE_IP=<インスタンスのIPアドレス>
ssh -i $SSH_KEY -L 8000:localhost:8000 -N -f ec2-user@$INSTANCE_IP

ローカルのブラウザから`http://localhost:8000` にアクセスすると下図の表示となるので、必要事項を入力し [Get started] をクリックします。

Airbyte にログインできると下図の表示となります。

コネクションの作成

このまま画面の [Create your first connection] よりコネクションを作成してみます。

データソースの定義

はじめに、データソースを定義します。ソースがない状態なので「Set up source」が選択された状態から接続したいデータソースのコネクタを選択します。

ここでは、データソースとして Amazon RDS for PostgreSQL を使用するので、PostgreSQL 用のコネクタを選択します。
設定手順は以下に記載があります。

PostgreSQL 側には事前に以下のテーブルを作成しておきました。

--データベースを作成
CREATE DATABASE sampledb;
\c sampledb
--スキーマを作成
CREATE SCHEMA test;
--テーブル作成
CREATE TABLE test.users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);
--レコードの追加
INSERT INTO test.users (name, email) VALUES ('John Doe', 'john.doe@example.com');
INSERT INTO test.users (name, email) VALUES ('Jane Smith', 'jane.smith@example.com');
INSERT INTO test.users (name, email) VALUES ('Mike Brown', 'mike.brown@example.com');

また、Airbyte 用の読み取り専用ユーザーも作成しておきます。

--ユーザーを作成
CREATE USER airbyte_user PASSWORD 'パスワード';
--権限を付与
GRANT USAGE ON SCHEMA test TO airbyte_user ;
GRANT SELECT ON ALL TABLES IN SCHEMA test TO airbyte_user ;
ALTER DEFAULT PRIVILEGES IN SCHEMA test GRANT SELECT ON TABLES TO airbyte_user ;

コネクタを選択後、接続情報を入力します。
ここでは下図の通り設定しました。

セキュリティとレプリケーションの方法を選択

レプリケーションの方法として、ここでは Xmin を指定しました。
セットアップを完了すると接続テストが行われます。接続に成功すると、コネクションの作成画面に遷移できます。

Destination の定義

Destination には Snowflake を指定しました。

Snowflake 環境にログインしドキュメントにある以下のコマンドを実行し、データのロード先となるデータベース・スキーマ、ロード用のウェアハウス、ユーザーを作成します。

-- set variables (these need to be uppercase) 
set airbyte_role = 'AIRBYTE_ROLE'; 
set airbyte_username = 'AIRBYTE_USER'; 
set airbyte_warehouse = 'AIRBYTE_WAREHOUSE'; 
set airbyte_database = 'AIRBYTE_DATABASE'; 
set airbyte_schema = 'AIRBYTE_SCHEMA';

-- set user password 
set airbyte_password = 'パスワード';

begin;

-- create Airbyte role 
use role securityadmin; 
create role if not exists identifier($airbyte_role); 
grant role identifier($airbyte_role) to role SYSADMIN;

-- create Airbyte user 
create user if not exists identifier($airbyte_username) 
	password = $airbyte_password 
	default_role = $airbyte_role 
	default_warehouse = $airbyte_warehouse;

grant role identifier($airbyte_role) to user identifier($airbyte_username);

-- change role to sysadmin for warehouse / database steps 
use role sysadmin;

-- create Airbyte warehouse 
create warehouse if not exists identifier($airbyte_warehouse) 
	warehouse_size = xsmall 
	warehouse_type = standard 
	auto_suspend = 60 
	auto_resume = true 
	initially_suspended = true;

-- create Airbyte database 
create database if not exists identifier($airbyte_database);

-- grant Airbyte warehouse access 
grant USAGE on warehouse identifier($airbyte_warehouse) to role identifier($airbyte_role);

-- grant Airbyte database access 
grant OWNERSHIP on database identifier($airbyte_database) to role identifier($airbyte_role);

commit;

begin;

USE DATABASE identifier($airbyte_database);

-- create schema for Airbyte data 
CREATE SCHEMA IF NOT EXISTS identifier($airbyte_schema);

commit;

begin;

-- grant Airbyte schema access 
grant OWNERSHIP on schema identifier($airbyte_schema) to role identifier($airbyte_role);

commit;

コマンド実行後、データベースには以下のオブジェクトが作成されます。

Snowflake への接続に必要な各項目を入力します。

接続テスト完了後、コネクションの設定を行います。

コネクションの設定

各設定項目は以下に記載があります。

  • Schedule type
    • 同期の実行オプションとして以下を指定できます
      • スケジュール(24時間ごと など)
      • Cron
      • 手動実行
    • Sync Schedules | Airbyte
  • Destination Namespace
  • Destination Stream Prefix
    • 宛先の各テーブル名に追加されるプレフィックスを指定できます
  • Detect and propagate schema changes

ここではスケジュールのみ 1時間間隔とし、他はデフォルトのまま進めました。 Activate the streams you want to sync では、同期する対象のテーブルを選択します。

Transformation は無しとしました。Airbyte では SQL や dbt と連携することで、データロード後の正規化も担えるようです。

設定が完了すると、選択したテーブルに対して同期が始まります。

同期完了後、Snowflake でテーブルを確認してみます。

ここでは、Destination Namespace を Destination default としたので、Destination の設定時に指定したスキーマ内にテーブルが作成されました。

クエリしてみると、下図の通りデータが同期されていることを確認できました。

追加出力されるカラムについては、以下に記載があります。

データの変更

初期同期後、ソース側でレコードを追加してみます。

INSERT INTO test.INSERT INTO test.users (name, email) VALUES ('Alex Johnson', 'alex.johnson@example.com');
users (name, email) VALUES ('Alex Johnson', 'alex.johnson@example.com');

再度同期されたら、Snowflake 側で確認すると、こちらにも追加されています。

ソース側でテーブルを追加してみます。

CREATE TABLE test.products (
  product_id SERIAL PRIMARY KEY,
  product_name VARCHAR(255),
  price DECIMAL(10, 2),
  category VARCHAR(100)
);
INSERT INTO test.products (product_name, price, category) VALUES 
('Laptop', 1200.00, 'Electronics'),
('Smartphone', 800.00, 'Electronics'),
('Coffee Maker', 150.00, 'Kitchen Appliances'),
('Desk Lamp', 45.99, 'Furniture'),
('Ergonomic Chair', 249.99, 'Office Supplies');

すると下図の表示となり、Airbyte 側で変更が検知されます。

ここでは、追加で同期したいのでこのテーブルにもチェックを入れます。

変更確認のポップアップが表示されるので、[Save connection] をクリックします。

すると、同期が始まります。

ジョブ履歴でも確認できます。

同期後、Snowflake 側でもテーブルの追加を確認できます。

こちらについて、レコードを変更してみます。

UPDATE test.products SET price = 10000 WHERE product_id = 1;

下図のように Snowflake 側でも変更が反映されていました。

検証中に詰まった所

はじめは EC2 のストレージをデフォルトの 8GB で検証していたのですが、この場合、コネクタ設定時に以下のようなエラーとなってしまいました。
※データソース側についても同様の表示

Caused by: io.temporal.failure.ApplicationFailure: message='Could not find image: airbyte/destination-snowflake:3.7.0', type='io.airbyte.workers.exception.WorkerException', nonRetryable=false

ここでは、システムのルートファイルシステム(/dev/xvda1)にほぼ空きがないことが原因でした。
ただ、この場合、容量を増やしても再設定時に自動でイメージがインストールされることはなかったので、結局ストレージの容量を増やし環境を作り直すことで解決しました。

さいごに

オープンソース版の Airbyte を EC2 にインストールし、データ連携をしてみました。
設定自体は GUI から容易にできたので、無償で使える点も含めて強力なツールと感じました。
ただし、特に構築や運用面を考慮するとコンテナ周りの知識が求められるので、私自身コンテナ周辺の知識がなく、この点は理解を深める必要があると感じました。
こちらの内容が何かの参考になれば幸いです。