Glue StudioでSnowflake connectivityを使ってSnowflakeとS3のデータ連携をしてみた
データアナリティクス事業本部 機械学習チームの鈴木です。
Amazon S3とSnowflake間のデータ連携の方法を改めて探していたのですが、ちょうどSnowflake connectivityの一般提供を開始したアナウンスがあったので試してみました。AWS Glue for Apache SparkのSnowflakeへのネイティブな接続の仕組みになります。
この記事について
概要
まず、以下の順番を踏みつつ、Glue Sudioから利用できるSnowflakeのターゲットを使い、S3にあるデータをSnowflakeにアップロードするGlueジョブの作成と動作確認をしてみました。
- Snowflake側でデータベースおよびユーザーの準備をする。
- AWS Secrets ManagerとGlue接続で接続情報を登録する。Glueジョブで使うIAMロールを作成する。
- Glueジョブを作成する。
- GlueジョブでデータをSnowflakeにロードする。
以下の『Connecting to Snowflake in AWS Glue Studio (preview)』ガイドを参考に設定しました。
※ これは冒頭のアナウンスページで紹介されていたガイドですが、記事執筆時点ではpreviewの文言があったため、そのように記載しています。使用する際はご留意下さい。
その後、Snowflakeのソースを使い、S3にデータを取得する逆方向の連携のGlueジョブも作成してみました。
前提
Glueジョブでデータを読み書きするS3バケットはすでに作成されているものとします。
S3バケットからSnowflakeに送るデータについては、以下のブログで用意した、UCI Machine Learning RepositoryのIris Data Setを使用しました。
下記リンクにて公開されています。
https://archive.ics.uci.edu/ml/datasets/iris
データをダウンロードし、iris.data
をS3バケットにアップロードし、iris_quality_check
というGlueテーブルを作成しておきました。
iris.data
は元データの最後に空行が2行入っており、Glueジョブで処理する際に1行分空のレコードとして処理されてしまったため、空行は1行削除しておきました。
Snowflakeへのデータの連携
1. Snowflake側の準備
データベースの作成
Snowsightで、SYSADMINにロールを切り替えて、データベース画面からML_DATABASE
という名前でデータベースを作成しました。
テーブル作成
ML_DATABASE.PUBLIC.IRIS
テーブルを作成しました。
Snowsightで、SQLワークシートを開き、ワークシート左上のデータベース・スキーマを選択する箇所でML_DATABASE.PUBLIC
を選びました。
SYSADMINで以下のSQLを実行し、テーブルを作成しました。
CREATE OR REPLACE TABLE IRIS ( sepal_length FLOAT, sepal_width FLOAT, petal_length FLOAT, petal_width FLOAT, class STRING );
ユーザーへのロールの作成
ML_DATABASE.PUBLIC.IRIS
テーブルに対する一連の権限を持ったGLUEJOB_OPERATOR
ユーザーを作成しました。
Snowsightで、SQLワークシートを開き、ワークシート左上のデータベース・スキーマを選択する箇所でML_DATABASE.PUBLIC
を選びました。
ACCOUNTADMINで以下のSQLを実行し、ロールとユーザーを作成しました。
-- ROLEの作成 CREATE ROLE GLUEJOB_ROLE; -- ウェアハウスへのUSAGEアクセス権の付与 GRANT USAGE ON WAREHOUSE 使用するウェアハウス TO ROLE GLUEJOB_ROLE; -- データベースへのUSAGEアクセス権の付与 GRANT USAGE ON DATABASE ML_DATABASE TO ROLE GLUEJOB_ROLE; -- スキーマへのアクセス権の付与 GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE GLUEJOB_ROLE; GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE GLUEJOB_ROLE; -- テーブルへの権限の付与 GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE GLUEJOB_ROLE; -- ユーザーの作成 CREATE USER GLUEJOB_OPERATOR PASSWORD = 'パスワード' DEFAULT_ROLE = 'GLUEJOB_ROLE' DEFAULT_WAREHOUSE = '使用するウェアハウス' MUST_CHANGE_PASSWORD = FALSE; -- ユーザーへのロールの付与 GRANT ROLE GLUEJOB_ROLE TO USER GLUEJOB_OPERATOR;
ポイントとしては以下です。
DEFAULT_WAREHOUSE
はロードする際に参照するので設定する。- Glueジョブから
CREATE TABLE
など発行するので、スキーマに対してその操作を行えるように権限をつける必要がある。Glue Stduioで生成されたPySparkのスクリプトに記載のSQL文をみて判断する。
処理実行後、意図通りに権限がついているか、SHOW GRANTS
コマンドで確認するとよいかもしれません。
SHOW GRANTS TO ROLE GLUEJOB_ROLE;
2. Glueジョブ周辺リソースの準備
シークレットの作成
Snowflakeでユーザーを作成したので、Glueジョブから使えるようにシークレットを作成しました。
AWS Secrets Managerより、新しいシークレットを保存する
を押して作成しました。
ポイントとして、ステップ1 シークレットのタイプを選択
で、以下のように設定しました。
シークレットのタイプはその他のシークレットのタイプ
を選択しました。これはほかのタイプで該当するものがなさそうだったので消去法的に選びました。
sfUser
にSnowflakeで作成したユーザー名、sfPassword
にそのパスワードを入れました。このユーザー名・パスワードのキーは、先に紹介したガイドにて指定があったものになります。
ステップ2以降は、名前だけは後から分かるものに設定して、残りはデフォルトのままとしました。
Glue接続の作成
Glueのコンソールより、以下のようにGlue接続を作成しました。
- コネクションタイプは
Snowflake
にしました。 - Snowflake URLは「https://アカウント識別子.snowflakecomputing.com」としました。
アカウント識別子
はorgname-account_name
という形式にしており、作り方は『アカウント識別子 | Snowflake Documentation』のガイドをご確認ください。 - AWS Secretは先に作成したものを使用しました。
Snowflake role
はoptionalですが、念の為設定しておきました。
Glueジョブ用のIAMロールの作成
以下のCloudFormationテンプレートを使い、Glueジョブで使うIAMロールを作成しました。
Parameters: DataBucketName: Description: Backet Name for Sample Data. Type: String SecretARN: Description: ARN of secret for snowflake user. Type: String Resources: GlueStudioExecutionRole: Type: AWS::IAM::Role Properties: RoleName: AWSGlueServiceRole-Studio-CM-nayuts AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: glue.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: GlueStudioSecretManagerAccessPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: [ "secretsmanager:GetSecretValue" ] Resource: [ !Ref SecretARN ] - PolicyName: GlueStudioS3AccessPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: [ "s3:*" ] Resource: [ !Sub "arn:aws:s3:::${DataBucketName}", !Sub "arn:aws:s3:::${DataBucketName}/*" ] ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
DataBucketName
で指定するデータがあるバケットへの権限については、Snowflakeに書き込む際と読み込む際の両方を考えて広めにつけました。運用の際にもう少し狭めたい場合は適宜修正してください。
3. Glueジョブの作成
Glue StudioにてGlueジョブを作成しました。
まず、以下のようにSnowflakeへのロード用のジョブを作成しました。
S3からデータを取得するノードは以下のようにしました。Glueテーブルからデータを読み出す形としました。
Snowflakeへデータをロードするノードは以下のようにしました。
Handling of data and target table
のオプションは充実しており、今回はTRUNCATE target table
にしました。実行のたびにテーブルを空にしてデータを入れてくれるので、洗い替え処理の実装にも使えますね。
Job details
タブでIAM Roleとワーカー数の設定をしました。IAM Roleは先に作成したものを指定しました。ワーカー数は小さいデータなので最小にしました。これは実際のデータ量により調整する必要があります。
Glue versionの箇所では、Snowflake connectionsに対応するGlueバージョンついての補足がでていますね。
これでGlueジョブの準備は終わりです。
ちなみに、生成されたPySparkのスクリプトは以下のようになっていました。データベース名
は置き換えてあります。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket extract S3bucketextract_node1 = glueContext.create_dynamic_frame.from_catalog( database="データベース名", table_name="iris_quality_check", transformation_ctx="S3bucketextract_node1", ) # Script generated for node Snowflake load Snowflakeload_node1690612413266 = glueContext.write_dynamic_frame.from_options( frame=S3bucketextract_node1, connection_type="snowflake", connection_options={ "autopushdown": "on", "dbtable": "iris", "connectionName": "cm-nayuts-snowflake-connection", "preactions": "CREATE TABLE IF NOT EXISTS PUBLIC.iris (sepal_length VARCHAR, sepal_width VARCHAR, petal_length VARCHAR, petal_width VARCHAR, class VARCHAR); TRUNCATE TABLE PUBLIC.iris;", "sfDatabase": "ml_database", "sfSchema": "PUBLIC", }, transformation_ctx="Snowflakeload_node1690612413266", ) job.commit()
特にconnection_options
に設定されているSQL文は、Snowflakeで発生したエラーが分かりやすくなるので確認すると良いと思います。
4. AWS Glueでのデータのロード
Glue StudioのRun
ボタンを押してジョブを実行しました。
Runs
タブでRun status
がSucceeded
になれば成功です。
Snowsightからも確かにデータが入っていることを確認できました。
クエリの実行結果は、作成したユーザーのクエリ履歴から確認できます。
補足
Snowflakeへの接続でユーザーの権限が足りないなどの不備がおきがちですが、Snowsighのクエリ履歴から確認すると解決を進めやすかったです。
例えば、GlueジョブでAn error occurred while calling o98.pyWriteDynamicFrame. SQL access control error:
というエラーで失敗してしまい、なにがダメだったんだろうとしばらく悩みました。
クエリ履歴を確認するとSQL access control error: Insufficient privileges to operate on schema 'PUBLIC'
と出ていたため、スキーマに対する操作の権限が何か足りていなかったということが分かりました。
なお、これはCREATE TABLE
を実行する際に、Snowflakeのユーザーにスキーマへのテーブル作成の権限が付与されていなかったためでした。先に紹介したユーザーの作成のコマンドでは、これを許可するようにしてあります。
Snowflakeからのデータの取得
Snowflakeへのデータの連携が無事にできたので、接続設定などは流用しつつ、Snowflakeからのデータの取得も試してみました。
1. Glueジョブの作成
以下のようなジョブを作成しました。
snowflakeからのデータ取得用ノードの設定です。
S3バケットへのデータ格納用ノードの設定です。結果確認が簡単なので、今回はCSV形式にしました。
生成されたPySparkのスクリプトは以下のようになっていました。格納先のバケット名
は置き換えてあります。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node Snowflake extract Snowflakeextract_node1690636314100 = glueContext.create_dynamic_frame.from_options( connection_type="snowflake", connection_options={ "autopushdown": "on", "dbtable": "iris", "connectionName": "cm-nayuts-snowflake-connection", "sfDatabase": "ml_database", "sfSchema": "PUBLIC", }, transformation_ctx="Snowflakeextract_node1690636314100", ) # Script generated for node S3 bucket load S3bucketload_node3 = glueContext.write_dynamic_frame.from_options( frame=Snowflakeextract_node1690636314100, connection_type="s3", format="csv", connection_options={ "path": "s3://格納先のバケット名/from_snowflake/", "partitionKeys": [], }, transformation_ctx="S3bucketload_node3", ) job.commit()
2. Glueジョブの実行
Run
ボタンからジョブを実行し、成功することを確認しました。
S3バケットの出力先を確認すると、以下のように20個のファイルが出力されていました。
試しに一つ開いてみると、データが正しく取得できていそうなことが確認できました。
SEPAL_LENGTH,SEPAL_WIDTH,PETAL_LENGTH,PETAL_WIDTH,CLASS 5.1,2.5,3.0,1.1,Iris-versicolor 6.8,3.0,5.5,2.1,Iris-virginica 4.4,2.9,1.4,0.2,Iris-setosa 6.0,3.4,4.5,1.6,Iris-versicolor 5.5,3.5,1.3,0.2,Iris-setosa 7.2,3.6,6.1,2.5,Iris-virginica 5.8,2.6,4.0,1.2,Iris-versicolor
ユースケースの考察
非常に簡単にAWSとSnowflakeの間のデータ連携ができることが確認できました。今回試したことを踏まえて、どのようなユースケースで使えそうか、例を考えてみました。
Snowflakeへのデータの連携
AWSにたまっているデータをSnowflakeへ連携する際に、Snowflakeへデータをロードできます。今回試したようにそのまま連携するのでも問題ありませんが、例えばGlueジョブにて個人情報などのマスクをした後で、Snowflakeに移すようなことが可能です。
Glueジョブによる個人情報のマスクについては以下で取り上げました。
Glueジョブを挟むことで、GUI操作にて誰でも大規模なデータに対する加工を設定してSnowflakeに連携できます。機密性の高い情報はAWSに閉じて処理してしまい、Snowflake側でほかの社内SnowflakeアカウントとSecure Data Sharingでデータ共有するというような使い方にしても安心でよいですね。
Snowflakeからのデータの取得
例えば、AWS側の機械学習リソースを使いたいときが考えてみました。
システム化する場合や、Snowflake側で機械学習用途に特化したデータマートを提供せずAWS側で追加の加工が必要な場合は、Glueジョブでデータを取得してPySparkで加工・結合などをした後、S3をインターフェースにSageMakerで利用するようなことができます。
アドホックに進めたい検証フェーズなど場合は、Snowpark Pythonなどでデータを直接Snowflakeから取得してもよいかもしれません。
最後に
AWS Glue for Apache Sparkで一般提供が開始された、Snowflakeへの接続を使って、S3との間で相互のデータ連携を試してみました。
特に認証情報や権限まわりでAWSとSnowflakeの双方で設定が必要ではありましたが、Glue Studioを使うことで非常に簡単に相互のデータ連携を実現することができました。
参考になりましたら幸いです。
そのほかに参考にした資料
- snowflake cloud data platform - Insufficient privileges to operate on schema 'PUBLIC' - Stack Overflow
- 爆速でSnowflakeの外部ステージ(S3)連携を設定するためのCloudFormationテンプレートを作ってみた | DevelopersIO
- CREATE USER | Snowflake Documentation
- GRANT | Snowflake Documentation
- SHOW GRANTS | Snowflake Documentation
- Glue Studioのネイティブ機能でSnowflakeに接続する(プレビュー機能) - Qiita
- 特にプレビュー時点からの経緯が分かり大変参考になりました。ありがとうございました。