Amazon AthenaデータソースコネクタでSnowflakeのデータにクエリする

AthenaのデータソースコネクタでSnowflakeにクエリするにはすこし工夫が必要です
2022.05.06

データアナリティクス事業本部インテグレーション部コンサルティングチーム新納(にいの)です。

Amazon Athenaではデータソースコネクタがサポートされており、Federated Queryを経由して外部のデータソースにアクセス可能です。本エントリでは、Snowflakeへのクエリについて解説します。

前提条件

今回の検証環境で使用した構成とポイントは以下の通りです。

  • Snowflakeの認証情報をAWS Secrets Managerに格納します。
  • Nat Gatewayを経由してSnowflakeとAWS Secrets Managerにアクセスします。

以下エントリと同様の構成です。

Snowflakeの認証情報をSecrets Managerに保存

Snowflakeのユーザー名・パスワードをSecrets Managerに登録します。今回は「その他のシークレットのタイプ」を選択し、usernamepasswordをタグに指定してSnowflakeのユーザー名・パスワードを入力します。

Secrets nameを入力して保存します。

Secrets ManagerのARNは後ほど使用するのでコピーしておきましょう。

データソースコネクタのデプロイ

AWSマネジメントコンソールにログインし、Serverless Application Repositoryにアクセスします。「AthenaSnowflakeConnector」で検索してデータソースコネクタにアクセスします。

項目を以下の通り入力していきます。

項目
アプリケーション名 AthenaSnowflakeConnector(デフォルト値)
SecretNamePrefix Secrets ManagerのARN
SpillBucket 一時的なデータの退避をするS3バケット名
DefaultConnectionString snowflake://jdbc:snowflake://<Snowflakeインスタンス名>:<ポート番号>/?warehouse=<ウェアハウス名>&db=<データベース名>&schema=<スキーマ名>&${Secrets Managerのシークレット名}
DisableSpillEncryption false(デフォルト値)
LambdaFunctionName 任意のLambda関数名。Athenaでクエリする際に使用します
LambdaMemory 3008(デフォルト値)
LambdaTimeout 900(デフォルト値)
PageCount 500000(デフォルト値)
SecurityGroupIds Lambda関数に適用するセキュリティグループID
SpillPrefix SpillBucket内のプレフィックス
SubnetIds Lambda関数をデプロイするサブネット

デプロイが完了すると以下のような画面が表示されます。

上記画面の「JdbcConnectionConfigRole」をクリックしてIAMロールの画面に移動し、Secrets Managerに保存したSnowflakeの認証情報を取得できるようにSecretsManagerReadWriteのIAMポリシーを付与します。

AthenaからSnowflakeのデータにクエリする

これでクエリする準備は整いました。AWSマネジメントコンソールからAmazon Athenaへアクセスし、SnowflakeのDBにクエリしてみます。以下のような形でクエリします。

SELECT * FROM "lambda:<LambdaFunctionName>".<スキーマ名>."<テーブル名>;

ところが、このまま実行するとEncountered an exception java.lang.NumberFormatException from your LambdaFunctionというエラーメッセージが表示されます。

CloudWatchログをエラーメッセージで検索すると、どうやらgetPartitions()の処理でNumberFormatExceptionエラーが発生しているようです。

ソースコードは以下GitHubにて公開されています。このソースをgetPartitionsでgrepします。

SnowflakeMetadataHandler.java内に処理があることがわかります。

% grep -r 'getPartitions' .
:
:
./src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java:    public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTableLayoutRequest,
./src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeMetadataHandler.java:        Block partitions = getSplitsRequest.getPartitions();
:
:

コードの中を見ると、設定可能なパーティション数を制限するpartitionlimitの設定が不足してエラーになっているようです。

    public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTableLayoutRequest,
                              QueryStatusChecker queryStatusChecker) throws Exception
    {
        LOGGER.info("{}: Schema {}, table {}", getTableLayoutRequest.getQueryId(), getTableLayoutRequest.getTableName().getSchemaName(),
                getTableLayoutRequest.getTableName().getTableName());
        Map<String, String> properties = System.getenv();
        /**
         * Customized environment variable "pagecount" for pagination based partition. It is currently set to 500000.
         * It means there will be 500000 rows per partition. The number of partition will be total number of rows divided by
         * pagecount variable value.
         */
        String pagecount = properties.get("pagecount");
        Long totalpagecount = Long.valueOf(pagecount);
    
        /**
         * Customized environment variable "partitionlimit" to limit the number of partitions.
         * this is to handle timeout issues because of huge partitions
         */
        String partitionlimit = properties.get("partitionlimit");
        Long totalPartitionlimit = Long.valueOf(partitionlimit);
    
        LOGGER.info(" Total Partition Limit" + totalPartitionlimit);
        LOGGER.info(" Total Page  Count" +  totalpagecount);

デプロイ時に指定できればいいのですが、2022/5/6現在、設定可能な項目はデプロイ画面に存在していませんので、手動でpartitionlimitを設定してエラーを回避します。 先ほどデプロイしたデータソースコネクタのLambda関数の画面に移動し、「設定」→「環境変数」→「編集」画面からキーをpartitionlimit、値を任意の数値(今回は10)に設定して保存します。

この状態で再度実行すると値が取得できました。

なお、SnowflakeにMFA認証を有効化している場合、クエリを発行するとDUO Mobileから複数回通知が発行されます。MFA認証を一時的に無効にしてクエリを試したい場合は、Snowflake側で以下のクエリを発行します。(実行にはACCOUNTADMIN権限が必要です。)

ALTER USER IF EXISTS <username> SET MINS_TO_BYPASS_MFA = <integer of min>

最後に

AthenaからSnowflakeのデータにクエリする方法のご紹介でした。AthenaからSnowflakeのデータにクエリすることで、AthenaのデータとSnowflakeのデータをブレンディングでき、分析の幅が広がりますね。 上述の通り、Snowflake側でMFA設定がされているとかなりの回数の通知が送信されますので、Athenaから接続する用のユーザーをMFA有効化せずに作成しておく方がよさそうです。

参考資料