AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみた(AWS CDK v2)

2022.06.27

こんにちは、CX事業本部 IoT事業部の若槻です。

Amazon Athenaを使用すると、SQLライクなクエリ文字列でS3 Bucketなどに格納されているデータを直接分析することが可能です。

今回は、AWS Step FunctionsからAthenaクエリ実行の開始および結果取得をしてみました。

やってみた

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  aws_athena,
  RemovalPolicy,
  Duration,
  Stack,
  StackProps,
} from 'aws-cdk-lib';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: 'athenaWorkGroup',
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    // Athenaクエリ実行開始
    const startAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaStartQueryExecution(
        this,
        'startAthenaQueryExecutionTask',
        {
          queryString: aws_stepfunctions.JsonPath.stringAt(
            '$.executionInput.queryString',
          ),
          workGroup: athenaWorkGroup.name,
          resultPath: '$.startAthenaQueryExecutionTaskOutPut',
        },
      );

    // Athenaクエリ実行取得
    const getAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaGetQueryExecution(
        this,
        'getAthenaQueryExecutionTask',
        {
          queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
            '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
          ),
          resultSelector: {
            queryState: aws_stepfunctions.JsonPath.stringAt(
              '$.QueryExecution.Status.State',
            ),
          },
          resultPath: '$.getAthenaQueryExecutionTaskOutPut',
        },
      );

    // 10秒待機
    const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', {
      time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)),
    });

    // Athenaクエリ結果取得
    const getAthenaQueryResultsTask =
      new aws_stepfunctions_tasks.AthenaGetQueryResults(
        this,
        'getAthenaQueryResultsTask',
        {
          queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
            '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
          ),
        },
      );

    // Athenaクエリが完了するまで待機
    const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice(
      this,
      'athenaQueryStateChoiceTask',
    );
    athenaQueryStateChoiceTask.when(
      aws_stepfunctions.Condition.or(
        aws_stepfunctions.Condition.stringEquals(
          aws_stepfunctions.JsonPath.stringAt(
            '$.getAthenaQueryExecutionTaskOutPut.queryState',
          ),
          'QUEUED',
        ),
        aws_stepfunctions.Condition.stringEquals(
          aws_stepfunctions.JsonPath.stringAt(
            '$.getAthenaQueryExecutionTaskOutPut.queryState',
          ),
          'RUNNING',
        ),
      ),
      wait10seconds,
    );
    athenaQueryStateChoiceTask.otherwise(getAthenaQueryResultsTask);

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: startAthenaQueryExecutionTask
        .next(wait10seconds)
        .next(getAthenaQueryExecutionTask)
        .next(athenaQueryStateChoiceTask),
    });
  }
}
  • Athenaクエリの実行は非同期なので実行開始後に10秒待機し、実行が完了するまで10秒待機および実行結果の確認を繰り返します。

上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。

Definition

{
  "StartAt": "startAthenaQueryExecutionTask",
  "States": {
    "startAthenaQueryExecutionTask": {
      "Next": "wait10seconds",
      "Type": "Task",
      "ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "$.executionInput.queryString",
        "ResultConfiguration": {},
        "WorkGroup": "athenaWorkGroup"
      }
    },
    "wait10seconds": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "getAthenaQueryExecutionTask"
    },
    "athenaQueryStateChoiceTask": {
      "Type": "Choice",
      "Choices": [
        {
          "Or": [
            {
              "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
              "StringEquals": "QUEUED"
            },
            {
              "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
              "StringEquals": "RUNNING"
            }
          ],
          "Next": "wait10seconds"
        }
      ],
      "Default": "getAthenaQueryResultsTask"
    },
    "getAthenaQueryExecutionTask": {
      "Next": "athenaQueryStateChoiceTask",
      "Type": "Task",
      "ResultPath": "$.getAthenaQueryExecutionTaskOutPut",
      "ResultSelector": {
        "queryState.$": "$.QueryExecution.Status.State"
      },
      "Resource": "arn:aws:states:::athena:getQueryExecution",
      "Parameters": {
        "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
      }
    },
    "getAthenaQueryResultsTask": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Parameters": {
        "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
      }
    }
  }
}

動作確認

次のクエリで確認してみます。

SELECT ARRAY [1,2,3,4] AS items

クエリをコンソールで実行した様子です。結果でレコードが1つ取得できています。

次の入力を指定してステートマシンを実行します。

Input

{
  "executionInput": {
    "queryString": "SELECT ARRAY [1,2,3,4] AS items"
  }
}

すると実行が成功しました。

getAthenaQueryResultsTaskのOutputを見ると、クエリ結果が取得できています!

getAthenaQueryResultsTask Output

{
  "ResultSet": {
    "ResultSetMetadata": {
      "ColumnInfo": [
        {
          "CaseSensitive": false,
          "CatalogName": "hive",
          "Label": "items",
          "Name": "items",
          "Nullable": "UNKNOWN",
          "Precision": 0,
          "Scale": 0,
          "SchemaName": "",
          "TableName": "",
          "Type": "array"
        }
      ]
    },
    "Rows": [
      {
        "Data": [
          {
            "VarCharValue": "items"
          }
        ]
      },
      {
        "Data": [
          {
            "VarCharValue": "[1, 2, 3, 4]"
          }
        ]
      }
    ]
  },
  "UpdateCount": 0
}

おわりに

AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみました。難なく実装ができたので流石はStep Functionsと言ったところです。

ただしathena:getQueryResults APIにより取得した実行結果のJson Objectはそのままでは扱いにくい形式なので、取得後にEvaluateExpressionタスクなどで上手いこと変換したいですね。

参考

以上