AWS CDKでAmazon Athena Parameterized Queriesを作ってみた

2022.07.10

こんにちは、CX事業本部 IoT事業部の若槻です。

Amazon Athenaでは、Parameterized Queriesを使用することにより、あらかじめ作成したQuery Statementに対して実行時にParameter値を渡して実行することができます。

Parameterized QueriesのStatementは下記のようにParameterのプレースメントホルダーを?で指定する構文となります。

SELECT * FROM "my_database"."my_table"
WHERE year = ? and month= ? and day= ?

今回は、このAmazon Athena Parameterized QueriesをAWS CDKで作ってみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  RemovalPolicy,
  Stack,
  StackProps,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ソースデータ格納バケット
    const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
      bucketName: `data-${this.account}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // データカタログ
    const dataCatalog = new glue.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
      tableName: 'source_data_glue_table',
      database: dataCatalog,
      bucket: sourceDataBucket,
      s3Prefix: 'data/',
      partitionKeys: [
        {
          name: 'date',
          type: glue.Schema.STRING,
        },
      ],
      dataFormat: glue.DataFormat.JSON,
      columns: [
        {
          name: 'userId',
          type: glue.Schema.STRING,
        },
        {
          name: 'count',
          type: glue.Schema.INTEGER,
        },
      ],
    });

    // データカタログテーブルへのPartition Projectionの設定
    (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
      'projection.enabled': true,
      'projection.date.type': 'date',
      'projection.date.range': '2022/06/20,NOW',
      'projection.date.format': 'yyyy/MM/dd',
      'projection.date.interval': 1,
      'projection.date.interval.unit': 'DAYS',
      'storage.location.template':
        `s3://${sourceDataBucket.bucketName}/data/` + '${date}',
    };

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: 'athenaWorkGroup',
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    // Prepared Queries
    new aws_athena.CfnPreparedStatement(this, 'athenaPreparedStatement', {
      statementName: 'athenaPreparedStatement',
      workGroup: athenaWorkGroup.name,
      queryStatement: `
        SELECT *
        FROM ${dataCatalog.databaseName}.${sourceDataGlueTable.tableName}
        WHERE date = ?
        LIMIT 10`,
    });
  }
}

上記をCDK Deployしてスタックをデプロイします。

Deploy後にget-prepared-statementコマンドでPrepared Queriesを取得してみると、作成されていることが確認できます。

$ aws athena get-prepared-statement \
  --work-group athenaWorkGroup \
  --statement-name athenaPreparedStatement
{
    "PreparedStatement": {
        "StatementName": "athenaPreparedStatement",
        "QueryStatement": "SELECT *\n        FROM data_catalog.source_data_glue_table\n        WHERE date = ?\n        LIMIT 10",
        "WorkGroupName": "athenaWorkGroup",
        "LastModifiedTime": "2022-07-10T22:50:56.049000+09:00"
    }
}

環境準備

データソースのLocationにデータをアップロードします。

$ cat data1
{"userId":"u001","count":3}
{"userId":"u001","count":1}
{"userId":"u002","count":5}
{"userId":"u002","count":8}
{"userId":"u003","count":2}

$ cat data2
{"userId":"u001","count":14}
{"userId":"u002","count":10}
{"userId":"u003","count":12}

$ cat data3
{"userId":"u001","count":4}
{"userId":"u002","count":4}
{"userId":"u003","count":0}

aws s3 cp data1 s3://${BUCKET_NAME}/data/2022/06/26/data1
aws s3 cp data2 s3://${BUCKET_NAME}/data/2022/06/27/data2
aws s3 cp data3 s3://${BUCKET_NAME}/data/2022/06/28/data3

動作確認

AthenaのQuery Editorで、先程作成したParameterized Queriesの実行をしてみます。

その際のパラメータの指定方法は2通りあります。

Parameter値をメニューから指定する

まずQuery StatementをEXECUTE statement_nameとして実行する場合。

この場合は、Query EditorでParameter入力メニューを使用してParameter値を指定するので、下記のようにQuery Statementを実行します。

EXECUTE athenaPreparedStatement

EditorでQuery Statementを指定して実行(Run)すると、Editor右側にParameter入力メニューが出てくるので、値を指定して実行(Run)します。

指定したParameter値を使用したQueryが実行されて結果が取得できました!

Parameter値をStatement内で指定する

次にQuery StatementをEXECUTE statement_name [USING value1 [ ,value2, ... ] ]として実行する場合。

この場合は、Statement内にParameter値を直接指定するので、入力メニューは使用しません。下記のようにQuery Statementを実行します。

EXECUTE athenaPreparedStatement USING '2022/06/28'

EditorでQuery Statementを指定して実行(Run)すると、結果が取得できました!

参考

以上