Step Functionsだけで、S3バケットにアップロードしたSQLファイルのクエリをAthenaで実行する

Step FunctionsのAWS SDK統合を使って、S3バケットにあるSQLファイルを取得すれば、Lambdaなどを挟まなくてもAthenaにクエリ実行をリクエストすることができます。
2024.03.25

データアナリティクス事業本部 機械学習チームの鈴木です。

Athenaでクエリを実行するシステムでは、PythonスクリプトやdbtがAthenaへの処理のリクエストを行い、オーケストレーターがその実行制御を行うことが多いと思います。この場合、オーケストレーターとの間になにかしらのコンピュートが挟まることとなり、その管理が必要になります。

AWS Step Functionsはサーバーレスオーケストレーションを提供するサービスですが、AWS SDK統合を使用すると、ほぼすべてのAWSのサービスのAPIを呼び出すことができます。

この仕組みを使うと、Step FunctionsだけでS3バケットにアップロードしたSQLファイルのクエリをAthenaで実行することができるので試してみました。

やりたいこと

Step Functionsのステートマシンから、S3に配置したSQLファイルを取得し、その中身に書いてあるクエリをAthenaで実行したいです。

構成は以下のものを考えました。

構成例

Step FunctionsではStartQueryExecutionアクションのQueryStringにクエリを渡すことにより、直接Athenaにクエリ実行をリクエストすることができます。Step FunctionsではS3バケットのファイルを取得できるため、先にS3バケットのオブジェクトを取得するステートを用意し、読み込んだクエリをQueryStringに指定することで、S3に配置したSQLファイルを実行することができます。

上記の構成例では、ステートマシンを2つに分けていますが、分けた方が綺麗かなと思っただけなので必ずしも分ける必要はありません。

環境とコード

環境

AWS CDK v2にてリソースを作成しました。バージョンは2.133.0を使用しました。

クエリの配置

S3バケットに以下のクエリを配置しました。

sample.sql

SELECT *
FROM "cm_nayuts"."iris";

事前にcm_nayutsデータベースにirisテーブルを作成しておきました。バケット名ファイルのあるS3パスはアップロードしたファイルに合わせて変更しました。

irisテーブルのDDL

CREATE EXTERNAL TABLE `iris`(
  `sepal_length` double, 
  `sepal_width` double, 
  `petal_length` double, 
  `petal_width` double, 
  `species` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://バケット名/ファイルのあるS3パス'
TBLPROPERTIES (
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',', 
  'skip.header.line.count'='1')

テーブルが参照しているS3バケットのパスには、UCI Machine Learning RepositoryのIris Data Setを配置しておきました。

このデータセットは、下記リンクにて公開されています。

  • https://archive.ics.uci.edu/ml/datasets/iris

コード

以下のようにスタックを定義しました。

lib/sfn_sql-stack.ts

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_iam,
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  aws_athena,
  RemovalPolicy,
  Duration,
  Stack,
  StackProps,
} from 'aws-cdk-lib';

export class ExecAthenaQueryWithS3SQLStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    //-------------------AthenaおよびS3の定義--------------------------
    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: this.node.tryGetContext('athena_work_group_name'),
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    //-------------------Athenaへのクエリ実行用のステートマシンの定義--------------------------
    // Athenaクエリ実行開始
    const startAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaStartQueryExecution(
        this,
        'startAthenaQueryExecutionTask',
        {
          queryString: aws_stepfunctions.JsonPath.stringAt(
            '$.getObjectTaskOutPut.body',
          ),
          workGroup: athenaWorkGroup.name,
          resultPath: '$.startAthenaQueryExecutionTaskOutPut',
          integrationPattern: aws_stepfunctions.IntegrationPattern.RUN_JOB,
        },
      );

    // ステートマシンの定義
    const execAthenaQueryStateMachine = new aws_stepfunctions.StateMachine(this, 'execAthenaQueryStateMachine', {
      stateMachineName: 'ExecAthenaQueryStateMachine',
      definition: startAthenaQueryExecutionTask
          .next(new aws_stepfunctions.Succeed(this, "FinishQuery")),
    });

    //SQL実行のための追加ポリシー
    execAthenaQueryStateMachine.addToRolePolicy(new aws_iam.PolicyStatement({
        actions: ['glue:*'],
        resources: [
          `arn:aws:glue:ap-northeast-1:${this.account}:database/*`,
          `arn:aws:glue:ap-northeast-1:${this.account}:table/*`
        ],
    }));


    //---------S3からSQLファイルを取得し、クエリ実行用のステートマシンを起動するステートマシンの定義-----------
    // SQLファイルの取得
    const getObjectTask = new aws_stepfunctions_tasks.CallAwsService(
      this,
      'getObjectTask',
      {
        service: 's3',
        action: 'getObject',
        parameters: {
          Bucket: this.node.tryGetContext('sql_bucket_name'),
          Key: this.node.tryGetContext('sql_file_path'),
        },
        iamResources: [`arn:aws:s3:::${this.node.tryGetContext('sql_bucket_name')}/*`],
        iamAction: 's3:GetObject',
        resultSelector: {
          body: aws_stepfunctions.JsonPath.stringAt('$.Body'),
        },
        resultPath: '$.getObjectTaskOutPut',
      },
    );

    // ステートマシンの定義
    new aws_stepfunctions.StateMachine(this, 'startStateMachineWithS3SQL', {
      stateMachineName: 'StartStateMachineWithS3SQL',
      definition: getObjectTask
          .next(
            new aws_stepfunctions_tasks.StepFunctionsStartExecution(
              this,
              "StartExecAthenaQueryStateMachine",
              {
                stateMachine: execAthenaQueryStateMachine,
                inputPath: "$",
                input: aws_stepfunctions.TaskInput.fromObject({"getObjectTaskOutPut": aws_stepfunctions.JsonPath.stringAt('$.getObjectTaskOutPut')}),
                associateWithParent: true,
                integrationPattern: aws_stepfunctions.IntegrationPattern.RUN_JOB
              }
            )
          )
          .next(new aws_stepfunctions.Succeed(this, "FinishStateMachine")),
    });

  }
}

sql_bucket_namesql_file_pathathena_work_group_nameは自身の環境のものをcdk.jsonに記載しておきました。

なお、sql_bucket_namesql_file_pathはSQLファイルをアップロードしたS3バケット名とパス、athena_work_group_nameは今回はsfnAthenaWorkGroupとしました。

ポイント

実装のポイントについてご紹介します。

1. ステートマシンの入れ子化

Athenaへのクエリ実行のためのステートマシンは使い回しするかなと思ったので、S3バケットからSQLファイルを取得するステートマシンとは別に作成し、入れ子にしました。

同期的に実行する入れ子のステートマシンの作成については以前に以下の記事で紹介しました。

なお、同期実行は非同期実行と比べて、ステートマシンのIAMロールへの権限付与が追加で必要になるため、CDKに作成してもらうのがおすすめです。

2. S3バケットからのSQLファイルの取得

CallAwsServiceにて、S3バケットからSQLファイルをs3:GetObjectアクションで取得し、本文を保持する形としました。

この使い方に注目した解説としては、以下の記事に記載があります。

今回紹介したサンプルコードでは、S3バケットやオブジェクトパスについてはハードコードしてしまっていますが、入力でステートマシンに渡せるようにしておくとより良いかもしれません。

3. Athenaへの同期的なクエリのリクエスト

StartQueryExecutionでは.sync統合パターンがサポートされているため、これを使いました。

AWS CDKの場合は、integrationPatternRUN_JOBを指定するとデプロイされるステートマシンでarn:aws:states:::athena:startQueryExecution.syncとステートに反映されるためご確認ください。

AthenaGetQueryExecutionにてクエリのステータスを確認し、ChoiceQUEUEDRUNNINGの場合は再度待機し、それ以外の場合は完了するループを作ってもよいですが、こちらの方が簡単ですね。

今回紹介したサンプルコードでは、クエリが失敗するとステートマシンは失敗します。

4. ステートマシンにGlueデータベース・テーブルへの権限を付与する

CDKの場合、デフォルトで自動的にステートマシン向けのIAMロールを生成してくれるので大変便利なのですが、Glueデータベース・テーブルへのアクセス権限は含まれていない場合があります。これはS3バケットから取得したSQLファイルに書かれているクエリが、どのGlueデータベース・テーブルへのものかまではCDKの定義からは分からないためです。

今回のサンプルコードでは、クエリをリクエストする方のステートマシンについて、StateMachineクラスのaddToRolePolicyで操作に十分なIAMポリシーを追加しました。この例ではかなり強めの権限を付与しているので、本番運用する場合は対象となるデータベース・テーブルに絞ることをおすすめします。

実行結果例

SQLファイルを取得し、ステートマシンを起動するステートマシン

S3バケットからのSQLファイルの取得およびAthenaへのクエリを実行するステートマシンの起動が成功しました。

外側のステートマシン

Athenaへのクエリを実行するステートマシン

呼び出し側から受け取ったクエリの内容を同期的にAthenaにリクエストしました。結果もステートの出力に出ていることが分かります。

内側のステートマシン

リクエストしたクエリは以下のように成功していました。

クエリヒストリーの結果

最後に

Step FunctionsだけでS3バケットにアップロードしたSQLファイルのクエリをAthenaで実行する構成例をご紹介しました。

まずは小さめの仕組みでいいので手軽にクエリの定期実行を初めてみたい場合にやりやすい構成だと思います。

一方でS3で大量のSQLファイルを管理するのは結構大変ではあると思うので、軌道に乗ってきたらCI/CDパイプラインでステートマシンの更新やSQLファイルの再配置を自動化するか、dbtなどのソフトウェアの恩恵を受けるとよいかもしれません。