AWS Step FunctionsからS3 Bucket内のデータに対するAthenaクエリを実行して結果を取得する(AWS CDK v2)

2022.06.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、AWS Step FunctionsからS3 Bucket内のデータに対するAmazon Athenaのクエリを実行して結果を取得してみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  aws_iam,
  RemovalPolicy,
  Duration,
  Stack,
  StackProps,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ソースデータ格納バケット
    const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
      bucketName: `data-${this.account}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // データカタログ
    const dataCatalog = new glue.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
      tableName: 'source_data_glue_table',
      database: dataCatalog,
      bucket: sourceDataBucket,
      s3Prefix: 'data/',
      partitionKeys: [
        {
          name: 'date',
          type: glue.Schema.STRING,
        },
      ],
      dataFormat: glue.DataFormat.JSON,
      columns: [
        {
          name: 'userId',
          type: glue.Schema.STRING,
        },
        {
          name: 'count',
          type: glue.Schema.INTEGER,
        },
      ],
    });

    // データカタログテーブルへのPartition Projectionの設定
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
      'projection.enabled': true,
      'projection.date.type': 'date',
      'projection.date.range': '2022/06/28,NOW',
      'projection.date.format': 'yyyy/MM/dd',
      'projection.date.interval': 1,
      'projection.date.interval.unit': 'DAYS',
      'storage.location.template':
        `s3://${sourceDataBucket.bucketName}/data/` + '${date}',
    };

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: 'athenaWorkGroup',
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    // Athenaクエリ実行開始タスク
    const startAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaStartQueryExecution(
        this,
        'startAthenaQueryExecutionTask',
        {
          queryString: aws_stepfunctions.JsonPath.stringAt(
            '$.executionInput.queryString',
          ),
          workGroup: athenaWorkGroup.name,
          resultPath: '$.startAthenaQueryExecutionTaskOutPut',
        },
      );

    // Athenaクエリ実行取得タスク
    const getAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaGetQueryExecution(
        this,
        'getAthenaQueryExecutionTask',
        {
          queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
            '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
          ),
          resultSelector: {
            queryState: aws_stepfunctions.JsonPath.stringAt(
              '$.QueryExecution.Status.State',
            ),
          },
          resultPath: '$.getAthenaQueryExecutionTaskOutPut',
        },
      );

    // 10秒待機
    const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', {
      time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)),
    });

    // Athenaクエリ結果取得
    const getAthenaQueryResultsTask =
      new aws_stepfunctions_tasks.AthenaGetQueryResults(
        this,
        'getAthenaQueryResultsTask',
        {
          queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
            '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
          ),
          resultSelector: {
            resultColumnInfo: aws_stepfunctions.JsonPath.stringAt(
              '$.ResultSet.ResultSetMetadata.ColumnInfo',
            ),
            resultRaws: aws_stepfunctions.JsonPath.stringAt(
              '$.ResultSet.Rows[1:(@.length-1)]',
            ),
          },
          resultPath: '$.getAthenaQueryResultsTaskOutPut',
        },
      );

    // Athenaクエリ実行結果をSimpleなJsonに変換
    const convertResultJsonTask =
      new aws_stepfunctions_tasks.EvaluateExpression(
        this,
        'convertResultJsonTask',
        {
          expression: aws_stepfunctions.JsonPath.format(
            'const cols = {}; const rows = {}; {};',
            aws_stepfunctions.JsonPath.stringAt(
              '$.getAthenaQueryResultsTaskOutPut.resultColumnInfo',
            ),
            aws_stepfunctions.JsonPath.stringAt(
              '$.getAthenaQueryResultsTaskOutPut.resultRaws',
            ),
            'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col["Name"]] = isNaN(val) ? val : Number(val); return j; }); return j; })',
          ),
        },
      );

    // Athenaクエリが完了するまで待機
    const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice(
      this,
      'athenaQueryStateChoiceTask',
    );
    athenaQueryStateChoiceTask.when(
      aws_stepfunctions.Condition.or(
        aws_stepfunctions.Condition.stringEquals(
          aws_stepfunctions.JsonPath.stringAt(
            '$.getAthenaQueryExecutionTaskOutPut.queryState',
          ),
          'QUEUED',
        ),
        aws_stepfunctions.Condition.stringEquals(
          aws_stepfunctions.JsonPath.stringAt(
            '$.getAthenaQueryExecutionTaskOutPut.queryState',
          ),
          'RUNNING',
        ),
      ),
      wait10seconds,
    );
    athenaQueryStateChoiceTask.otherwise(
      getAthenaQueryResultsTask.next(convertResultJsonTask),
    );

    // State Machine
    const stateMachine = new aws_stepfunctions.StateMachine(
      this,
      'stateMachine',
      {
        stateMachineName: 'stateMachine',
        definition: startAthenaQueryExecutionTask
          .next(wait10seconds)
          .next(getAthenaQueryExecutionTask)
          .next(athenaQueryStateChoiceTask),
      },
    );

    // Glue Data Catalogリソースへのアクセス権をState Machineに追加
    stateMachine.role.addToPrincipalPolicy(
      new aws_iam.PolicyStatement({
        actions: ['glue:Get*', 'glue:List*'],
        effect: aws_iam.Effect.ALLOW,
        resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn],
      }),
    );
  }
}
  • AthenaStartQueryExecutionAthenaGetQueryExecution Classによる実装では、DefaultのGlue Data Catalog(TableおよびDatabaase)のアクセス権限しか付与されません。よって今回用のGlue Data Catalogリソースの権限を明示的に付与しています。
  • getAthenaQueryResultsTaskタスクではresultSelector$.ResultSet.Rows[1:(@.length-1)]とすることにより、取得結果データのヘッダー行を削除しています。
    • convertResultJsonTaskタスクで使用するカラム情報はresultColumnInfoを使用します。

上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。

Definition

{
  "StartAt": "startAthenaQueryExecutionTask",
  "States": {
    "startAthenaQueryExecutionTask": {
      "Next": "wait10seconds",
      "Type": "Task",
      "ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "$.executionInput.queryString",
        "ResultConfiguration": {},
        "WorkGroup": "athenaWorkGroup"
      }
    },
    "wait10seconds": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "getAthenaQueryExecutionTask"
    },
    "athenaQueryStateChoiceTask": {
      "Type": "Choice",
      "Choices": [
        {
          "Or": [
            {
              "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
              "StringEquals": "QUEUED"
            },
            {
              "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
              "StringEquals": "RUNNING"
            }
          ],
          "Next": "wait10seconds"
        }
      ],
      "Default": "getAthenaQueryResultsTask"
    },
    "getAthenaQueryExecutionTask": {
      "Next": "athenaQueryStateChoiceTask",
      "Type": "Task",
      "ResultPath": "$.getAthenaQueryExecutionTaskOutPut",
      "ResultSelector": {
        "queryState.$": "$.QueryExecution.Status.State"
      },
      "Resource": "arn:aws:states:::athena:getQueryExecution",
      "Parameters": {
        "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
      }
    },
    "getAthenaQueryResultsTask": {
      "Next": "convertResultJsonTask",
      "Type": "Task",
      "ResultPath": "$.getAthenaQueryResultsTaskOutPut",
      "ResultSelector": {
        "resultColumnInfo.$": "$.ResultSet.ResultSetMetadata.ColumnInfo",
        "resultRaws.$": "$.ResultSet.Rows[1:(@.length-1)]"
      },
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Parameters": {
        "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
      }
    },
    "convertResultJsonTask": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:ProcessStack-Evalda2d1181604e4a4586941a6abd7fe42dF-YCTnZYU56eCr",
      "Parameters": {
        "expression.$": "States.Format('const cols = {}; const rows = {}; {};', $.getAthenaQueryResultsTaskOutPut.resultColumnInfo, $.getAthenaQueryResultsTaskOutPut.resultRaws, 'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col[\"Name\"]] = isNaN(val) ? val : Number(val); return j; }); return j; })')",
        "expressionAttributeValues": {}
      }
    }
  }
}

動作確認

次のクエリをState Machineから実行してみます。

SELECT * 
FROM "data_catalog"."source_data_glue_table"
WHERE date = '2022/06/29'

同クエリをAthenaのコンソールから実行した様子です。これと同じ結果が取得できることを期待します。

Inputを次のように指定してステートマシンを実行します。

Input

{
  "executionInput": {
    "queryString": "SELECT * FROM \"data_catalog\".\"source_data_glue_table\" WHERE date = '2022/06/29'"
  }
}

実行が成功しました。

State MachineのOutputで期待したデータが取得できています!

Output

[
  {
    "userid": "u001",
    "count": 14,
    "date": "2022/06/29"
  },
  {
    "userid": "u002",
    "count": 10,
    "date": "2022/06/29"
  },
  {
    "userid": "u003",
    "count": 12,
    "date": "2022/06/29"
  }
]

参考

以上