S3データのSnowflake取り込み(テーブル作成からデータ連携まで)処理自動化を「データ統合基盤 CS アナリティクス」で行う

2020.10.13

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

データ分析を行うためのデータ分析基盤を構築する際には、まずは集めたデータをデータウェアハウス(DWH)にロードする必要があるかと思います。

その際のデータロードの手法として、まずは「抽出」したデータをそのまま「格納」し、その後必要に応じて「変換」を行ってから別のテーブルに移すという、いわゆるELT「Extract(抽出)/Load(格納)/Transform(変換)」の処理を行うパターンがあります。

CSA JMCには、このELTのうち「Load(格納)」の処理をサポートする機能として「テーブル作成機能」と「データ連携機能」があります。

本エントリでは、S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するかをご紹介したいと思います。

CSA JMCの挙動確認バージョン

当エントリの内容は以下のCSA JMCバージョンで挙動を確認しています。

  • CSA JMC v5.1.0

前提

Snowflakeとの接続設定は済んでいることを前提としています。詳細については、以下のエントリをご参照ください。

全体像について

まずは今回ご紹介する内容の全体像を説明したいと思います。

Snowflakeの「ストレージ統合」と「外部ステージ」

Snowflakeには「ストレージ統合」および「外部ステージ」とよばれるものを利用して、外部クラウドストレージ(ここではS3)を参照できる仕組みがあります。

イメージとしては以下のとおりです。

まずSnowflake上に「ストレージ統合」を作成・設定することでSnowflakeからS3バケット内のオブジェクトを参照できるようになります。

設定においてはSnowflake上での設定に加えて、AWS上でIAMロールの設定も必要となります。

その上で、作成した「ストレージ統合」を利用する「外部ステージ」をデータベース上に作成することで、以下のようにクエリベースでS3上のオブジェクトをロード対象として扱うことができるようになります。

COPY INTO テーブル名 FROM @外部ステージ名;

CSA JMCによる「テーブル作成」と「データ連携」

上記の設定により、SnowflakeからS3バケットのオブジェクトが利用可能な状態となります。

その上で、CSA JMCの「テーブル作成機能」と「データ連携機能」を利用してSnowflakeにテーブルを作成し、データをロードします。

イメージとしては以下となります。

「テーブル作成機能」ではクエリを発行してテーブルを作成します。なお、この際にはS3上の対象ファイルを解析して自動的にテーブルを定義して作成します。

次に「データ連携機能」ですが、こちらはSnowflake上に作成した外部ステージ(透過的にS3バケットのオブジェクトが参照可能)を利用して、テーブルにデータをロードします。

本エントリでは、この流れをふまえて事前準備~テーブル作成、データ連携の流れをご紹介したいと思います。

事前準備

まずは事前準備です。CSA JMCのv5.1.0では、Snowflakeの外部ステージ設定がされていることを前提としていますので、以下の通り外部ステージ設定を行います。

ストレージ統合の作成

CSA JMCにおけるサイトの単位で、ストレージ統合を作成します。この際、まだAWS上でのIAMロールの作成を行っていないので、一旦STORAGE_AWS_ROLE_ARNSTORAGE_ALLOWED_LOCATIONSはダミーの値を設定します。

ストレージ統合の名前は csa_[JMCのサイトID] としておきます。なお、JMCのサイトIDは「サイト管理 > サイト設定 > 基本情報」から確認可能です。

USE ROLE CSA;  -- Snowflake接続設定で設定しているロール
CREATE OR REPLACE STORAGE INTEGRATION csa_csa_devio_snowflake
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::0:role/role_name'
  STORAGE_ALLOWED_LOCATIONS = ('s3://')
;

成功した場合、以下のように表示されます。

統合 CSA_CSA_DEVIO_SNOWFLAKE が正常に作成されました。

もし、作成時に以下のエラーが発生する場合は、Snowflake接続設定で指定したロールに権限が不足しています。

SQLアクセス制御エラー: account 「XXXXXXX」で操作するには権限が不十分です

ACCOUNTADMINロールを利用して、Snowflake接続設定で指定したロールにCREATE INTEGRATION権限を付与します。

USE ROLE ACCOUNTADMIN;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE CSA;

STORAGE_AWS_IAM_USER_ARN と STORAGE_AWS_EXTERNAL_ID の確認

ストレージ統合が作成できたら、DESCRIBE INTEGRATIONコマンドを実行し、STORAGE_AWS_IAM_USER_ARNSTORAGE_AWS_EXTERNAL_IDの値を確認しておきます。

USE ROLE CSA;
DESCRIBE INTEGRATION csa_csa_devio_snowflake

IAMロールとポリシーの設定

AWSの管理コンソールから、IAMロールとポリシーを設定していきます。

IAMロールの作成

作成時には、信頼されたエンティティの種類として「別のAWSアカウント」を指定し、「アカウントID」には仮に自分のAWSアカウントIDを指定します。

次に「Attach アクセス権限ポリシー」には何も指定せずに次に進みます。

タグは任意のタグを設定し、ロール名は csa_[JMCのサイトID] として作成します。

信頼関係の設定

IAMロールを作成したら、改めて該当ロールを開き「信頼関係」タブの「信頼関係の編集」を行います。

ポリシードキュメントは以下のように設定します。Snowflake上でDESCRIBE INTEGRATIONコマンドを実行し、STORAGE_AWS_IAM_USER_ARNSTORAGE_AWS_EXTERNAL_IDの値を確認しましたが、PrincipalAWSにはSTORAGE_AWS_IAM_USER_ARNの値を指定し、sts:ExternalIdにはSTORAGE_AWS_EXTERNAL_IDの値を指定します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:user/foo-bar"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "XXXXXXXXXX"
        }
      }
    }
  ]
}

ポリシーの設定

最後にポリシーの設定です。「アクセス権限」タブを開き、「インラインポリシーの追加」をクリックします。

ポリシーは以下のように設定します。ここでのS3バケット名は、CSA JMCの「サイト管理 > サイト設定 > 構成要素設定」において「データ連携用S3バケット」として設定したバケットを指定します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:PutObject",
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::csa-devio-snowflake-copy/*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::csa-devio-snowflake-copy"
        }
    ]
}

ポリシー名は任意ですが、csa_[サイトID]_[バケット名("-" や "."は"_"に変更)] として、今回は csa_csa_devio_snowflake_csa_devio_snowflake_copy とします。

これでIAMロールは作成完了です。作成した「IAMロールのARN」と設定した「バケット名」を控えておきます。

ストレージ統合の修正

Snowflakeに戻り、ストレージ統合を修正します。

以下のようにSTORAGE_AWS_ROLE_ARNを先程作成した「IAMロールのARN」に、STORAGE_ALLOWED_LOCATIONSを設定した「バケット名」に変更します。

USE ROLE CSA;
ALTER INTEGRATION csa_csa_devio_snowflake SET STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/csa_csa_devio_snowflake';
ALTER INTEGRATION csa_csa_devio_snowflake SET STORAGE_ALLOWED_LOCATIONS = ('s3://csa-devio-snowflake-copy/');

外部ステージの作成

ストレージ統合が設定できたら、あとは外部ステージの作成です。

IAMロールに設定したインラインポリシー名と同様にして、csa_[サイトID]_[バケット名("-" や "."は"_"に変更)] として外部ステージを作成します。STORAGE_INTEGRATIONには先程作成したストレージ統合を、URLにはストレージ統合に設定したバケット名のURLを指定します。

また、外部ステージはデータベースに属するので、USE DATABASEでSnowflake接続で指定したデータベースを利用するようにします。

USE ROLE CSA;
USE DATABASE CSA;
CREATE STAGE csa_csa_devio_snowflake_csa_devio_snowflake_copy
  STORAGE_INTEGRATION = csa_csa_devio_snowflake
  URL = 's3://csa-devio-snowflake-copy/'
;

これでSnowflakeからS3バケットを外部ステージとして利用する設定ができました。

データファイルの配置

あとは、利用するデータファイルをS3バケットに配置しておきます。

今回は以下のようなCSVファイルをs3://csa-data-bucket/init/ootaka_sample/ootaka_sample.csvへ配置しました。

ootaka_sample.csv

id,val
1,val1
2,val2
3,val3

これですべての準備ができました!

本題の確認

さて、ここからようやく本題に入ります。

本エントリのテーマの再確認ですが、「S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するか」になります。

「S3に配置されたデータ」は、先程のootaka_sample.csvです。このファイルをSnowflakeにロードする際に、「テーブル作成」と「データ連携」をどう実現するかという話になります。

まずは「テーブル作成」からです。

ファイルからテーブルを作成したい

ファイルからテーブルを作成するには、CSA JMCのメニュー「リソース > テーブル作成(ファイルから)」から作成します。

設定は画像の通りです。主要な設定については以下となります。

  • テーブル名
    • 任意のテーブル名です。今回はootaka_sampleとしました。
  • バケット名
    • データを配置したバケット名です。
  • 外部ステージ
    • 先程Snowflake上に作成した外部ステージを選択します。
  • S3ファイルパス
    • データを配置したバケット内のファイルパスです。

この設定で「作成」ボタンをクリックすると、テーブルの作成が「ジョブ」として開始され、ジョブが成功するとSnowflake上にテーブルが作成されていることが確認できます。

定期的にデータロードを実行したい

次に、「データ連携」についてです。

今回はテーブル作成時に指定したS3パスに、毎日午前6時までに定期的にデータがアップロードされるという前提で考えます。この場合、午前6時にはデータ連携を開始可能なので固定パスのファイルを毎日午前6時にSnowflake上のテーブルにロードするように設定してみます。

CSA JMCのメニュー「構成要素 > データ連携」から新規にデータ連携を作成します。設定は以下のようにします。

主要な設定については以下となります。

  • スキーマ名
    • データロードを行うテーブルのスキーマを指定します。
  • テーブル名
    • データロードを行うテーブルootaka_sampleを指定します。
  • バケット名
    • データを配置したバケット名です。
  • 外部ステージ
    • Snowflake上に作成した外部ステージを選択します。
  • S3ファイルパスを固定指定にする
    • 特定のS3パスにファイルが定期的にアップロードされる前提なので、固定とします。
  • S3ファイルパス
    • データが配置されるバケット内のファイルパスです。
  • 取り込み方式
    • 今回は「全件洗替」としています。

設定したら「保存」して、「ジョブ > ジョブ一覧」から「ジョブの追加」に進みます。以下のように「構成要素を実行」するジョブを追加します。

ジョブを作成したら、「実行予定時刻」を「日次」の「06:00」に指定します。これで定期実行スケジュールを指定できました。

次に「構成要素」の「編集」をクリックして、先程作成した「データ連携」を追加して保存します。これにより、先程設定した通りの設定でS3上のファイルがSnowflakeへ連携されます。

最後に、ジョブ一覧画面に戻ってジョブを有効化して完了です。これで定期的にジョブが実行され、データロードされるようになりました。

試しにジョブを実行すると、このようにS3上のデータがSnowflakeのテーブルにロードされていることが確認できます。

まとめ

以上、S3に配置されたデータをSnowflakeにロードする際に、テーブル作成からデータ連携までを、CSA JMCを利用してどのように実現するかをご紹介しました。

どなたかのお役に立てば幸いです。それでは!