AWS DMSから出力されたファイルをSnowflakeにロードしてみた

2021.11.17

こんにちは!DA(データアナリティクス)事業本部 サービスソリューション部の大高です。

外部データベースからSnowflakeへのデータ連携方法として、世の中には様々な便利なプロダクトがあると思います。これらのプロダクトはとてもユーザーフレンドリーで管理もしやすいので、個人的にはプロダクトの利用をオススメしたいです。

一方で、「リアルタイム連携がしたい」、「細かいカスタマイズがしたい」、「コストを抑えたい」というケースなどでプロダクトの利用がフィットしない場合があり、その場合にはプロダクトに代わる方法を自ら実装する必要が出てくる場合もあります。

今回はそういった、自ら実装するケースにおいて「外部データベースからSnowflakeへのリアルタイム連携」で良い方法がないかな?と調べている中で、Onishiさんの書かれている、こちらの記事のような方法があることを知りました。

今回はこちらで紹介されている連携方法のうち、下記の図のように「S3」から「Snowflake」へ連携する箇所を実際に試してみたいと思います。

なお、「Aurora」から「S3」へ連携する箇所については、下記エントリもご参照ください。

前提条件

ストレージ統合と外部ステージ

今回利用するAWS DMSがファイル出力するS3バケットのパスについては、事前にストレージ統合や外部ステージの設定を済ませています。

AWS DMSから出力されるファイル

今回はAWS DMSからS3への連携については取り上げません。

DMSから出力されるファイルは、AWSのドキュメントに記載されている通り以下のようなcsvファイルが継続的に連携されるものとしました。

I,101,Smith,Bob,4-Jun-14,New York
U,101,Smith,Bob,8-Oct-15,Los Angeles
U,101,Smith,Bob,13-Mar-17,Dallas
D,101,Smith,Bob,13-Mar-17,Dallas

ここをcsvなどにして、Snowflake側のテーブル定義もカチッとした定義にするかどうかは要件に応じて変わると思います。Aurora側のテーブル定義が変更されたときに、パイプラインで「エラーを起こさせないようにしたい」のか、それとも「エラーにしたいのか」のかなど、要件に応じて変わると思います。

とりあえず今回はお試しなのでcsvでいきます!

S3からSnowflakeへの連携概要

では、実際にS3からSnowflakeへの連携をどうするかを考えたいと思います。

今回は、基本的に以下のエントリで紹介した流れを踏襲してみます。

一方で、「Task」については少し検討が必要です。上記エントリでは単純にStreamに入ってきたデータをそのまま対象テーブルにINSERTしていましたが、今回はAWS DMSが出力するファイルにIUDというDML操作に応じたフラグが存在します。

このフラグに応じてDML操作を変えたいので、TaskのクエリについてはこのDMLフラグを見て処理を変える必要がありますね。

実装してみる

では、実際に実装してみましょう。

テーブルとStreamの作成

まずは、Snowpipeのデータを受けるテーブルと、そのStream、また、最終的にデータをロードするテーブルを作成します。

--------------------------------------------------------------------------------
-- テーブル作成
--------------------------------------------------------------------------------
-- Snowpipe用テーブル作成
CREATE OR REPLACE TABLE ootaka_sandbox_db.public.cdc_users_pipe(
  dml_type  VARCHAR,
  id        NUMBER,
  name1     VARCHAR,
  name2     VARCHAR,
  date      VARCHAR,
  city      VARCHAR
);

-- 実テーブル作成
CREATE OR REPLACE TABLE ootaka_sandbox_db.public.cdc_users(
  id        NUMBER,
  name1     VARCHAR,
  name2     VARCHAR,
  date      VARCHAR,
  city      VARCHAR
);

--------------------------------------------------------------------------------
-- STREAM作成
--------------------------------------------------------------------------------
CREATE OR REPLACE STREAM ootaka_sandbox_db.public.cdc_users_stream ON TABLE ootaka_sandbox_db.public.cdc_users_pipe;

初期データのロード

次に初期データのロードをします。

ここが少しポイントになる点ですが、AWS DMSはフルロードでAuroraからのデータをS3に出力するとLOAD00000001.csvというようなファイルを出力します。

このファイルのデータには標準ではDMLフラグは含まれずに、以下のような形式となります。

100,Raiden,Jack,4-Jul-76,Philadelphia

そのため、このファイルは対象テーブルへ直接ロードする必要があるので、以下のようなクエリでロードする必要があります。

--------------------------------------------------------------------------------
-- 初期データロード
--------------------------------------------------------------------------------
COPY INTO ootaka_sandbox_db.world.city
FROM @ootaka_sandbox_db.public.cm_ootaka_external_stage/Snowpipe/CDC/LOAD00000001.csv
  FILE_FORMAT = (TYPE = 'CSV')
;

なお、今回ここでは取り上げませんが、DMSのS3に対するエンドポイント設定において、IncludeOpForFullLoadオプションを有効にすることでによってDMLフラグも出力されるようになり、上記の対応については不要になる認識です。

Snowpipeの作成

次にSnowpipeを作成します。

これによりS3上に配置したファイルのデータは、自動的にcdc_users_pipe、および、そのストリームcdc_users_streamに反映されるようになりました。

--------------------------------------------------------------------------------
-- Snowpipe作成
--------------------------------------------------------------------------------
-- Snowpipe作成
CREATE OR REPLACE PIPE ootaka_sandbox_db.public.cm_ootaka_cdc_pipe AUTO_INGEST=TRUE AS
  COPY INTO ootaka_sandbox_db.public.cdc_users_pipe
  FROM @ootaka_sandbox_db.public.cm_ootaka_external_stage/Snowpipe/CDC/
  FILE_FORMAT = (TYPE = 'CSV')
;

-- SQSのARNを取得
DESC PIPE cm_ootaka_cdc_pipe;

-- 上記ARNでS3イベント設定をする(AWSコンソール上での作業)

-- PIPEのリフレッシュ
ALTER PIPE ootaka_sandbox_db.public.cm_ootaka_cdc_pipe REFRESH;

Taskの作成

最後にTaskの作成です。

ここではMERGE文を利用して、AWS DMSが出力したファイルのDMLフラグI(INSERT)、U(UPDATE)、D(DELETE)に応じた処理の振り分けをしています。

idカラムが主キーになっているので、このキーとDMLフラグを組み合わせて判断をおこなっています。

--------------------------------------------------------------------------------
-- TASK作成
--------------------------------------------------------------------------------
CREATE OR REPLACE TASK cdc_users_task
  WAREHOUSE = X_SMALL_WH
  SCHEDULE = '1 minute'
WHEN
  SYSTEM$STREAM_HAS_DATA('cdc_users_stream')
AS
MERGE INTO ootaka_sandbox_db.public.cdc_users target
USING ootaka_sandbox_db.public.cdc_users_stream src
  ON target.id = src.id
-- INSERT
WHEN NOT MATCHED AND src.dml_type = 'I' THEN
  INSERT
  (
    target.id,
    target.name1,
    target.name2,
    target.date,
    target.city
  )
  VALUES
  (
    src.id,
    src.name1,
    src.name2,
    src.date,
    src.city
  )
-- UPDATE
WHEN MATCHED AND src.dml_type = 'U' THEN
  UPDATE
  SET
    target.name1 = src.name1,
    target.name2 = src.name2,
    target.date = src.date,
    target.city = src.city
-- DELETE
WHEN MATCHED AND src.dml_type = 'D' THEN
  DELETE
;

/*
-- ROLEにタスク実行権限が無い場合には、ACCOUNTADMINから付与すること
USE ROLE ACCOUNTADMIN;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE OOTAKA_SANDBOX_ROLE;
*/

-- タスク実行
ALTER TASK cdc_users_task RESUME;

今回はMERGE文としていますが、他にも色々やり方はあるかと思います。

これでTASKも出来たので、STREAMにデータがきたら自動で対象テーブルcdc_usersにデータがロードされるようになりました。

連携の確認

当エントリでは、「S3」から「Snowflake」へ連携する箇所だけを対象とするので、DMSは動かしません。

以下の3ファイルを用意して、想定される動作がされるかだけを確認します。それぞれのファイルをS3バケット配置したあとのcdc_usersテーブルを見てみます。

insert-data.csv

I,999,Raiden,Jack,4-Jul-82,Liberia
SELECT * FROM public.cdc_users;

+-----+--------+-------+----------+---------+                                   
|  ID | NAME1  | NAME2 | DATE     | CITY    |
|-----+--------+-------+----------+---------|
| 999 | Raiden | Jack  | 4-Jul-82 | Liberia |
+-----+--------+-------+----------+---------+
1 Row(s) produced. Time Elapsed: 0.169s

update-data.csv

U,999,Raiden,Jack,4-Jul-09,NewYork
SELECT * FROM public.cdc_users;

+-----+--------+-------+----------+---------+                                   
|  ID | NAME1  | NAME2 | DATE     | CITY    |
|-----+--------+-------+----------+---------|
| 999 | Raiden | Jack  | 4-Jul-09 | NewYork |
+-----+--------+-------+----------+---------+
1 Row(s) produced. Time Elapsed: 0.351s

delete-data.csv

D,999,Raiden,Jack,4-Jul-09,NewYork
SELECT * FROM public.cdc_users;

+----+-------+-------+------+------+                                            
| ID | NAME1 | NAME2 | DATE | CITY |
|----+-------+-------+------+------|
+----+-------+-------+------+------+
0 Row(s) produced. Time Elapsed: 0.205s

想定通りになっていますね!

まとめ

以上、AWS DMSから出力されたファイルをSnowflakeにロードしてみました。

別途、Amazon AuroraからのAWS DMSを利用した一連の流れでの連携も試してみたのですが、結果としてはわりと良い感じに動いたかとおもいます!

一方で以下のような課題も感じられました。このあたりは要件に応じた設計が必要となると思いますので、適宜よく検討して課題を解決していく必要がありそうです。

  • テーブル数が多い場合に、Snowflake側でどのように実装するのか
    • テーブル数だけPIPEを作成する?
  • Snowpipeによってデータを受けるテーブルのメンテナンス
    • ずっとデータが増えていくので、どれぐらいのタイミングでメンテナンスするか
  • エラーの検知
    • ロードエラーがあった場合に、どのように検知・通知をさせるのか
    • どのような状態であれば「正常」な状態なのか
  • リカバリ対応
    • 想定外のファイルがS3上に配置された場合に、どのようにSnowflake側のデータをリカバリするのか

どなたかのお役に立てば幸いです。それでは!