Amazon AthenaデータソースコネクタでSnowflakeのデータにクエリする
データアナリティクス事業本部インテグレーション部コンサルティングチーム新納(にいの)です。
Amazon Athenaではデータソースコネクタがサポートされており、Federated Queryを経由して外部のデータソースにアクセス可能です。本エントリでは、Snowflakeへのクエリについて解説します。
前提条件
今回の検証環境で使用した構成とポイントは以下の通りです。
- Snowflakeの認証情報をAWS Secrets Managerに格納します。
- Nat Gatewayを経由してSnowflakeとAWS Secrets Managerにアクセスします。
以下エントリと同様の構成です。
Snowflakeの認証情報をSecrets Managerに保存
Snowflakeのユーザー名・パスワードをSecrets Managerに登録します。今回は「その他のシークレットのタイプ」を選択し、username
、password
をタグに指定してSnowflakeのユーザー名・パスワードを入力します。
Secrets nameを入力して保存します。
Secrets ManagerのARNは後ほど使用するのでコピーしておきましょう。
データソースコネクタのデプロイ
AWSマネジメントコンソールにログインし、Serverless Application Repositoryにアクセスします。「AthenaSnowflakeConnector」で検索してデータソースコネクタにアクセスします。
項目を以下の通り入力していきます。
項目 | 値 |
---|---|
アプリケーション名 | AthenaSnowflakeConnector(デフォルト値) |
SecretNamePrefix | Secrets ManagerのARN |
SpillBucket | 一時的なデータの退避をするS3バケット名 |
DefaultConnectionString | snowflake://jdbc:snowflake://<Snowflakeインスタンス名>:<ポート番号>/?warehouse=<ウェアハウス名>&db=<データベース名>&schema=<スキーマ名>&${Secrets Managerのシークレット名} |
DisableSpillEncryption | false(デフォルト値) |
LambdaFunctionName | 任意のLambda関数名。Athenaでクエリする際に使用します |
LambdaMemory | 3008(デフォルト値) |
LambdaTimeout | 900(デフォルト値) |
PageCount | 500000(デフォルト値) |
SecurityGroupIds | Lambda関数に適用するセキュリティグループID |
SpillPrefix | SpillBucket内のプレフィックス |
SubnetIds | Lambda関数をデプロイするサブネット |
デプロイが完了すると以下のような画面が表示されます。
上記画面の「JdbcConnectionConfigRole」をクリックしてIAMロールの画面に移動し、Secrets Managerに保存したSnowflakeの認証情報を取得できるようにSecretsManagerReadWrite
のIAMポリシーを付与します。
AthenaからSnowflakeのデータにクエリする
これでクエリする準備は整いました。AWSマネジメントコンソールからAmazon Athenaへアクセスし、SnowflakeのDBにクエリしてみます。以下のような形でクエリします。
SELECT * FROM "lambda:<LambdaFunctionName>".<スキーマ名>."<テーブル名>;
ところが、このまま実行するとEncountered an exception java.lang.NumberFormatException from your LambdaFunction
というエラーメッセージが表示されます。
CloudWatchログをエラーメッセージで検索すると、どうやらgetPartitions()
の処理でNumberFormatExceptionエラーが発生しているようです。
ソースコードは以下GitHubにて公開されています。このソースをgetPartitions
でgrepします。
SnowflakeMetadataHandler.java
内に処理があることがわかります。
% grep -r 'getPartitions' . : : ./src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java: public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTableLayoutRequest, ./src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java: Block partitions = getSplitsRequest.getPartitions(); : :
コードの中を見ると、設定可能なパーティション数を制限するpartitionlimit
の設定が不足してエラーになっているようです。
public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTableLayoutRequest, QueryStatusChecker queryStatusChecker) throws Exception { LOGGER.info("{}: Schema {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(), getTableLayoutRequest.getTableName().getTableName()); Map<String, String> properties = System.getenv(); /** * Customized environment variable "pagecount" for pagination based partition. It is currently set to 500000. * It means there will be 500000 rows per partition. The number of partition will be total number of rows divided by * pagecount variable value. */ String pagecount = properties.get("pagecount"); Long totalpagecount = Long.valueOf(pagecount); /** * Customized environment variable "partitionlimit" to limit the number of partitions. * this is to handle timeout issues because of huge partitions */ String partitionlimit = properties.get("partitionlimit"); Long totalPartitionlimit = Long.valueOf(partitionlimit); LOGGER.info(" Total Partition Limit" + totalPartitionlimit); LOGGER.info(" Total Page Count" + totalpagecount);
デプロイ時に指定できればいいのですが、2022/5/6現在、設定可能な項目はデプロイ画面に存在していませんので、手動でpartitionlimit
を設定してエラーを回避します。
先ほどデプロイしたデータソースコネクタのLambda関数の画面に移動し、「設定」→「環境変数」→「編集」画面からキーをpartitionlimit
、値を任意の数値(今回は10)に設定して保存します。
この状態で再度実行すると値が取得できました。
なお、SnowflakeにMFA認証を有効化している場合、クエリを発行するとDUO Mobileから複数回通知が発行されます。MFA認証を一時的に無効にしてクエリを試したい場合は、Snowflake側で以下のクエリを発行します。(実行にはACCOUNTADMIN権限が必要です。)
ALTER USER IF EXISTS <username> SET MINS_TO_BYPASS_MFA = <integer of min>
最後に
AthenaからSnowflakeのデータにクエリする方法のご紹介でした。AthenaからSnowflakeのデータにクエリすることで、AthenaのデータとSnowflakeのデータをブレンディングでき、分析の幅が広がりますね。 上述の通り、Snowflake側でMFA設定がされているとかなりの回数の通知が送信されますので、Athenaから接続する用のユーザーをMFA有効化せずに作成しておく方がよさそうです。