S3データのSnowflake取り込み(テーブル作成からデータ連携まで)処理自動化を「データ統合基盤 CS アナリティクス」で行う
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
データ分析を行うためのデータ分析基盤を構築する際には、まずは集めたデータをデータウェアハウス(DWH)にロードする必要があるかと思います。
その際のデータロードの手法として、まずは「抽出」したデータをそのまま「格納」し、その後必要に応じて「変換」を行ってから別のテーブルに移すという、いわゆるELT「Extract(抽出)/Load(格納)/Transform(変換)」の処理を行うパターンがあります。
CSA JMCには、このELTのうち「Load(格納)」の処理をサポートする機能として「テーブル作成機能」と「データ連携機能」があります。
本エントリでは、S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するかをご紹介したいと思います。
CSA JMCの挙動確認バージョン
当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。
- CSA JMC v5.1.0
前提
Snowflakeとの接続設定は済んでいることを前提としています。詳細については、以下のエントリをご参照ください。
全体像について
まずは今回ご紹介する内容の全体像を説明したいと思います。
Snowflakeの「ストレージ統合」と「外部ステージ」
Snowflakeには「ストレージ統合」および「外部ステージ」とよばれるものを利用して、外部クラウドストレージ(ここではS3)を参照できる仕組みがあります。
イメージとしては以下のとおりです。
まずSnowflake上に「ストレージ統合」を作成・設定することでSnowflakeからS3バケット内のオブジェクトを参照できるようになります。
設定においてはSnowflake上での設定に加えて、AWS上でIAMロールの設定も必要となります。
その上で、作成した「ストレージ統合」を利用する「外部ステージ」をデータベース上に作成することで、以下のようにクエリベースでS3上のオブジェクトをロード対象として扱うことができるようになります。
COPY INTO テーブル名 FROM @外部ステージ名;
CSA JMCによる「テーブル作成」と「データ連携」
上記の設定により、SnowflakeからS3バケットのオブジェクトが利用可能な状態となります。
その上で、CSA JMCの「テーブル作成機能」と「データ連携機能」を利用してSnowflakeにテーブルを作成し、データをロードします。
イメージとしては以下となります。
「テーブル作成機能」ではクエリを発行してテーブルを作成します。なお、この際にはS3上の対象ファイルを解析して自動的にテーブルを定義して作成します。
次に「データ連携機能」ですが、こちらはSnowflake上に作成した外部ステージ(透過的にS3バケットのオブジェクトが参照可能)を利用して、テーブルにデータをロードします。
本エントリでは、この流れをふまえて事前準備~テーブル作成、データ連携の流れをご紹介したいと思います。
事前準備
まずは事前準備です。CSA JMCのv5.1.0では、Snowflakeの外部ステージ設定がされていることを前提としていますので、以下の通り外部ステージ設定を行います。
ストレージ統合の作成
CSA JMCにおけるサイトの単位で、ストレージ統合を作成します。この際、まだAWS上でのIAMロールの作成を行っていないので、一旦STORAGE_AWS_ROLE_ARNとSTORAGE_ALLOWED_LOCATIONSはダミーの値を設定します。
ストレージ統合の名前は csa_[JMCのサイトID]
としておきます。なお、JMCのサイトIDは「サイト管理 > サイト設定 > 基本情報」から確認可能です。
USE ROLE CSA; -- Snowflake接続設定で設定しているロール CREATE OR REPLACE STORAGE INTEGRATION csa_csa_devio_snowflake TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::0:role/role_name' STORAGE_ALLOWED_LOCATIONS = ('s3://') ;
成功した場合、以下のように表示されます。
統合 CSA_CSA_DEVIO_SNOWFLAKE が正常に作成されました。
もし、作成時に以下のエラーが発生する場合は、Snowflake接続設定で指定したロールに権限が不足しています。
SQLアクセス制御エラー: account 「XXXXXXX」で操作するには権限が不十分です
ACCOUNTADMINロールを利用して、Snowflake接続設定で指定したロールにCREATE INTEGRATION
権限を付与します。
USE ROLE ACCOUNTADMIN; GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE CSA;
STORAGE_AWS_IAM_USER_ARN と STORAGE_AWS_EXTERNAL_ID の確認
ストレージ統合が作成できたら、DESCRIBE INTEGRATION
コマンドを実行し、STORAGE_AWS_IAM_USER_ARN
とSTORAGE_AWS_EXTERNAL_ID
の値を確認しておきます。
USE ROLE CSA; DESCRIBE INTEGRATION csa_csa_devio_snowflake
IAMロールとポリシーの設定
AWSの管理コンソールから、IAMロールとポリシーを設定していきます。
IAMロールの作成
作成時には、信頼されたエンティティの種類として「別のAWSアカウント」を指定し、「アカウントID」には仮に自分のAWSアカウントIDを指定します。
次に「Attach アクセス権限ポリシー」には何も指定せずに次に進みます。
タグは任意のタグを設定し、ロール名は csa_[JMCのサイトID]
として作成します。
信頼関係の設定
IAMロールを作成したら、改めて該当ロールを開き「信頼関係」タブの「信頼関係の編集」を行います。
ポリシードキュメントは以下のように設定します。Snowflake上でDESCRIBE INTEGRATION
コマンドを実行し、STORAGE_AWS_IAM_USER_ARN
とSTORAGE_AWS_EXTERNAL_ID
の値を確認しましたが、Principal
のAWS
にはSTORAGE_AWS_IAM_USER_ARN
の値を指定し、sts:ExternalId
にはSTORAGE_AWS_EXTERNAL_ID
の値を指定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::123456789012:user/foo-bar" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "sts:ExternalId": "XXXXXXXXXX" } } } ] }
ポリシーの設定
最後にポリシーの設定です。「アクセス権限」タブを開き、「インラインポリシーの追加」をクリックします。
ポリシーは以下のように設定します。ここでのS3バケット名は、CSA JMCの「サイト管理 > サイト設定 > 構成要素設定」において「データ連携用S3バケット」として設定したバケットを指定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3:::csa-devio-snowflake-copy/*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::csa-devio-snowflake-copy" } ] }
ポリシー名は任意ですが、csa_[サイトID]_[バケット名("-" や "."は"_"に変更)]
として、今回は csa_csa_devio_snowflake_csa_devio_snowflake_copy
とします。
これでIAMロールは作成完了です。作成した「IAMロールのARN」と設定した「バケット名」を控えておきます。
ストレージ統合の修正
Snowflakeに戻り、ストレージ統合を修正します。
以下のようにSTORAGE_AWS_ROLE_ARN
を先程作成した「IAMロールのARN」に、STORAGE_ALLOWED_LOCATIONS
を設定した「バケット名」に変更します。
USE ROLE CSA; ALTER INTEGRATION csa_csa_devio_snowflake SET STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/csa_csa_devio_snowflake'; ALTER INTEGRATION csa_csa_devio_snowflake SET STORAGE_ALLOWED_LOCATIONS = ('s3://csa-devio-snowflake-copy/');
外部ステージの作成
ストレージ統合が設定できたら、あとは外部ステージの作成です。
IAMロールに設定したインラインポリシー名と同様にして、csa_[サイトID]_[バケット名("-" や "."は"_"に変更)]
として外部ステージを作成します。STORAGE_INTEGRATION
には先程作成したストレージ統合を、URL
にはストレージ統合に設定したバケット名のURLを指定します。
また、外部ステージはデータベースに属するので、USE DATABASE
でSnowflake接続で指定したデータベースを利用するようにします。
USE ROLE CSA; USE DATABASE CSA; CREATE STAGE csa_csa_devio_snowflake_csa_devio_snowflake_copy STORAGE_INTEGRATION = csa_csa_devio_snowflake URL = 's3://csa-devio-snowflake-copy/' ;
これでSnowflakeからS3バケットを外部ステージとして利用する設定ができました。
データファイルの配置
あとは、利用するデータファイルをS3バケットに配置しておきます。
今回は以下のようなCSVファイルをs3://csa-data-bucket/init/ootaka_sample/ootaka_sample.csv
へ配置しました。
id,val 1,val1 2,val2 3,val3
これですべての準備ができました!
本題の確認
さて、ここからようやく本題に入ります。
本エントリのテーマの再確認ですが、「S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するか」になります。
「S3に配置されたデータ」は、先程のootaka_sample.csv
です。このファイルをSnowflakeにロードする際に、「テーブル作成」と「データ連携」をどう実現するかという話になります。
まずは「テーブル作成」からです。
ファイルからテーブルを作成したい
ファイルからテーブルを作成するには、CSA JMCのメニュー「リソース > テーブル作成(ファイルから)」から作成します。
設定は画像の通りです。主要な設定については以下となります。
- テーブル名
- 任意のテーブル名です。今回は
ootaka_sample
としました。
- 任意のテーブル名です。今回は
- バケット名
- データを配置したバケット名です。
- 外部ステージ
- 先程Snowflake上に作成した外部ステージを選択します。
- S3ファイルパス
- データを配置したバケット内のファイルパスです。
この設定で「作成」ボタンをクリックすると、テーブルの作成が「ジョブ」として開始され、ジョブが成功するとSnowflake上にテーブルが作成されていることが確認できます。
定期的にデータロードを実行したい
次に、「データ連携」についてです。
今回はテーブル作成時に指定したS3パスに、毎日午前6時までに定期的にデータがアップロードされるという前提で考えます。この場合、午前6時にはデータ連携を開始可能なので固定パスのファイルを毎日午前6時にSnowflake上のテーブルにロードするように設定してみます。
CSA JMCのメニュー「構成要素 > データ連携」から新規にデータ連携を作成します。設定は以下のようにします。
主要な設定については以下となります。
- スキーマ名
- データロードを行うテーブルのスキーマを指定します。
- テーブル名
- データロードを行うテーブル
ootaka_sample
を指定します。
- データロードを行うテーブル
- バケット名
- データを配置したバケット名です。
- 外部ステージ
- Snowflake上に作成した外部ステージを選択します。
- S3ファイルパスを固定指定にする
- 特定のS3パスにファイルが定期的にアップロードされる前提なので、固定とします。
- S3ファイルパス
- データが配置されるバケット内のファイルパスです。
- 取り込み方式
- 今回は「全件洗替」としています。
設定したら「保存」して、「ジョブ > ジョブ一覧」から「ジョブの追加」に進みます。以下のように「構成要素を実行」するジョブを追加します。
ジョブを作成したら、「実行予定時刻」を「日次」の「06:00」に指定します。これで定期実行スケジュールを指定できました。
次に「構成要素」の「編集」をクリックして、先程作成した「データ連携」を追加して保存します。これにより、先程設定した通りの設定でS3上のファイルがSnowflakeへ連携されます。
最後に、ジョブ一覧画面に戻ってジョブを有効化して完了です。これで定期的にジョブが実行され、データロードされるようになりました。
試しにジョブを実行すると、このようにS3上のデータがSnowflakeのテーブルにロードされていることが確認できます。
まとめ
以上、S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するかをご紹介しました。
どなたかのお役に立てば幸いです。それでは!