AWS Step FunctionsからAmazon Athena Parameterized Queriesを実行する構成を作ってみた(AWS CDK)

2022.07.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

前回のエントリでは、Amazon Athena Parameterized QueriesをCDKで作ってみました。

今回はその発展編として、Amazon Athena Parameterized QueriesをAWS Step Functionsから実行する構成をAWS CDKで作ってみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  RemovalPolicy,
  Stack,
  StackProps,
  aws_iam,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ソースデータ格納バケット
    const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
      bucketName: `data-${this.account}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // データカタログ
    const dataCatalog = new glue.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
      tableName: 'source_data_glue_table',
      database: dataCatalog,
      bucket: sourceDataBucket,
      s3Prefix: 'data/',
      partitionKeys: [
        {
          name: 'date',
          type: glue.Schema.STRING,
        },
      ],
      dataFormat: glue.DataFormat.JSON,
      columns: [
        {
          name: 'userId',
          type: glue.Schema.STRING,
        },
        {
          name: 'count',
          type: glue.Schema.INTEGER,
        },
      ],
    });

    // データカタログテーブルへのPartition Projectionの設定
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
      'projection.enabled': true,
      'projection.date.type': 'date',
      'projection.date.range': '2022/06/20,NOW',
      'projection.date.format': 'yyyy/MM/dd',
      'projection.date.interval': 1,
      'projection.date.interval.unit': 'DAYS',
      'storage.location.template':
        `s3://${sourceDataBucket.bucketName}/data/` + '${date}',
    };

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: 'athenaWorkGroup',
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    // Prepared Queries
    const athenaPreparedStatement = new aws_athena.CfnPreparedStatement(
      this,
      'athenaPreparedStatement',
      {
        statementName: 'athenaPreparedStatement',
        workGroup: athenaWorkGroup.name,
        queryStatement: `
        SELECT *
        FROM ${dataCatalog.databaseName}.${sourceDataGlueTable.tableName}
        WHERE date = ?
        LIMIT 10`,
      },
    );

    // Athenaクエリ実行開始
    const startAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaStartQueryExecution(
        this,
        'startAthenaQueryExecutionTask',
        {
          queryString: aws_stepfunctions.JsonPath.format(
            `EXECUTE ${athenaPreparedStatement.statementName} USING '{}'`,
            aws_stepfunctions.JsonPath.stringAt('$.query_dt'),
          ),
          workGroup: athenaWorkGroup.name,
          resultPath: '$.startAthenaQueryExecutionTaskOutPut',
        },
      );

    // ステートマシン
    const stateMachine = new aws_stepfunctions.StateMachine(
      this,
      'stateMachine',
      {
        stateMachineName: 'stateMachine',
        definition: startAthenaQueryExecutionTask,
      },
    );

    // Glue Data Catalogリソースへのアクセス権をState Machineに追加
    stateMachine.role.addToPrincipalPolicy(
      new aws_iam.PolicyStatement({
        actions: ['glue:Get*', 'glue:List*'],
        effect: aws_iam.Effect.ALLOW,
        resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn],
      }),
    );

    // PreparedStatementを実行するためのアクセス権をState Machineに追加
    stateMachine.addToRolePolicy(
      new aws_iam.PolicyStatement({
        actions: ['athena:GetPreparedStatement'],
        effect: aws_iam.Effect.ALLOW,
        resources: [
          `arn:aws:athena:${this.region}:${this.account}:workgroup/${athenaWorkGroup.name}`,
        ],
      }),
    );
  }
}
  • AthenaStartQueryExecution Taskで、Query String EXECUTE ${athenaPreparedStatement.statementName} USING '{}'によりParameterized Queriesを実行しています。
    • この時、文字列型のParameter値の指定は'{}'のようにシングルクォートで囲む必要があります。
  • State MachineはWorkgroupに対してathena:GetPreparedStatement Actionを行うので、Policyを付与します。

上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。

State Machine Definition

{
  "StartAt": "startAthenaQueryExecutionTask",
  "States": {
    "startAthenaQueryExecutionTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "States.Format('EXECUTE athenaPreparedStatement USING {}', $.query_dt)",
        "ResultConfiguration": {},
        "WorkGroup": "athenaWorkGroup"
      }
    }
  }
}

動作確認

次の入力を指定してステートマシンを実行します。

Input

{
  "query_dt": "2022/06/28"
}

実行が成功しました。

AthenaのQuery editorで履歴を見ると、実行が正常に完了しています。

同QueryをEditorで開くと、データ取得もちゃんと出来ていますね!

参考

以上