データアナリティクス事業本部 機械学習チームの鈴木です。
Athenaでクエリを実行するシステムでは、PythonスクリプトやdbtがAthenaへの処理のリクエストを行い、オーケストレーターがその実行制御を行うことが多いと思います。この場合、オーケストレーターとの間になにかしらのコンピュートが挟まることとなり、その管理が必要になります。
AWS Step Functionsはサーバーレスオーケストレーションを提供するサービスですが、AWS SDK統合を使用すると、ほぼすべてのAWSのサービスのAPIを呼び出すことができます。
この仕組みを使うと、Step FunctionsだけでS3バケットにアップロードしたSQLファイルのクエリをAthenaで実行することができるので試してみました。
やりたいこと
Step Functionsのステートマシンから、S3に配置したSQLファイルを取得し、その中身に書いてあるクエリをAthenaで実行したいです。
構成は以下のものを考えました。
Step FunctionsではStartQueryExecution
アクションのQueryString
にクエリを渡すことにより、直接Athenaにクエリ実行をリクエストすることができます。Step FunctionsではS3バケットのファイルを取得できるため、先にS3バケットのオブジェクトを取得するステートを用意し、読み込んだクエリをQueryString
に指定することで、S3に配置したSQLファイルを実行することができます。
上記の構成例では、ステートマシンを2つに分けていますが、分けた方が綺麗かなと思っただけなので必ずしも分ける必要はありません。
環境とコード
環境
AWS CDK v2にてリソースを作成しました。バージョンは2.133.0
を使用しました。
クエリの配置
S3バケットに以下のクエリを配置しました。
sample.sql
SELECT *
FROM "cm_nayuts"."iris";
事前にcm_nayuts
データベースにiris
テーブルを作成しておきました。バケット名
・ファイルのあるS3パス
はアップロードしたファイルに合わせて変更しました。
irisテーブルのDDL
CREATE EXTERNAL TABLE `iris`(
`sepal_length` double,
`sepal_width` double,
`petal_length` double,
`petal_width` double,
`species` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://バケット名/ファイルのあるS3パス'
TBLPROPERTIES (
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'skip.header.line.count'='1')
テーブルが参照しているS3バケットのパスには、UCI Machine Learning RepositoryのIris Data Setを配置しておきました。
このデータセットは、下記リンクにて公開されています。
- https://archive.ics.uci.edu/ml/datasets/iris
コード
以下のようにスタックを定義しました。
lib/sfn_sql-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_iam,
aws_stepfunctions,
aws_stepfunctions_tasks,
aws_athena,
RemovalPolicy,
Duration,
Stack,
StackProps,
} from 'aws-cdk-lib';
export class ExecAthenaQueryWithS3SQLStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
//-------------------AthenaおよびS3の定義--------------------------
// Athenaクエリ結果格納バケット
const athenaQueryResultBucket = new aws_s3.Bucket(
this,
'athenaQueryResultBucket',
{
bucketName: `athena-query-result-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
},
);
// Athenaワークグループ
const athenaWorkGroup = new aws_athena.CfnWorkGroup(
this,
'athenaWorkGroup',
{
name: this.node.tryGetContext('athena_work_group_name'),
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
},
},
},
);
//-------------------Athenaへのクエリ実行用のステートマシンの定義--------------------------
// Athenaクエリ実行開始
const startAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaStartQueryExecution(
this,
'startAthenaQueryExecutionTask',
{
queryString: aws_stepfunctions.JsonPath.stringAt(
'$.getObjectTaskOutPut.body',
),
workGroup: athenaWorkGroup.name,
resultPath: '$.startAthenaQueryExecutionTaskOutPut',
integrationPattern: aws_stepfunctions.IntegrationPattern.RUN_JOB,
},
);
// ステートマシンの定義
const execAthenaQueryStateMachine = new aws_stepfunctions.StateMachine(this, 'execAthenaQueryStateMachine', {
stateMachineName: 'ExecAthenaQueryStateMachine',
definition: startAthenaQueryExecutionTask
.next(new aws_stepfunctions.Succeed(this, "FinishQuery")),
});
//SQL実行のための追加ポリシー
execAthenaQueryStateMachine.addToRolePolicy(new aws_iam.PolicyStatement({
actions: ['glue:*'],
resources: [
`arn:aws:glue:ap-northeast-1:${this.account}:database/*`,
`arn:aws:glue:ap-northeast-1:${this.account}:table/*`
],
}));
//---------S3からSQLファイルを取得し、クエリ実行用のステートマシンを起動するステートマシンの定義-----------
// SQLファイルの取得
const getObjectTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'getObjectTask',
{
service: 's3',
action: 'getObject',
parameters: {
Bucket: this.node.tryGetContext('sql_bucket_name'),
Key: this.node.tryGetContext('sql_file_path'),
},
iamResources: [`arn:aws:s3:::${this.node.tryGetContext('sql_bucket_name')}/*`],
iamAction: 's3:GetObject',
resultSelector: {
body: aws_stepfunctions.JsonPath.stringAt('$.Body'),
},
resultPath: '$.getObjectTaskOutPut',
},
);
// ステートマシンの定義
new aws_stepfunctions.StateMachine(this, 'startStateMachineWithS3SQL', {
stateMachineName: 'StartStateMachineWithS3SQL',
definition: getObjectTask
.next(
new aws_stepfunctions_tasks.StepFunctionsStartExecution(
this,
"StartExecAthenaQueryStateMachine",
{
stateMachine: execAthenaQueryStateMachine,
inputPath: "$",
input: aws_stepfunctions.TaskInput.fromObject({"getObjectTaskOutPut": aws_stepfunctions.JsonPath.stringAt('$.getObjectTaskOutPut')}),
associateWithParent: true,
integrationPattern: aws_stepfunctions.IntegrationPattern.RUN_JOB
}
)
)
.next(new aws_stepfunctions.Succeed(this, "FinishStateMachine")),
});
}
}
sql_bucket_name
・sql_file_path
・athena_work_group_name
は自身の環境のものをcdk.json
に記載しておきました。
なお、sql_bucket_name
・sql_file_path
はSQLファイルをアップロードしたS3バケット名とパス、athena_work_group_name
は今回はsfnAthenaWorkGroup
としました。
ポイント
実装のポイントについてご紹介します。
1. ステートマシンの入れ子化
Athenaへのクエリ実行のためのステートマシンは使い回しするかなと思ったので、S3バケットからSQLファイルを取得するステートマシンとは別に作成し、入れ子にしました。
同期的に実行する入れ子のステートマシンの作成については以前に以下の記事で紹介しました。
なお、同期実行は非同期実行と比べて、ステートマシンのIAMロールへの権限付与が追加で必要になるため、CDKに作成してもらうのがおすすめです。
2. S3バケットからのSQLファイルの取得
CallAwsServiceにて、S3バケットからSQLファイルをs3:GetObject
アクションで取得し、本文を保持する形としました。
この使い方に注目した解説としては、以下の記事に記載があります。
今回紹介したサンプルコードでは、S3バケットやオブジェクトパスについてはハードコードしてしまっていますが、入力でステートマシンに渡せるようにしておくとより良いかもしれません。
3. Athenaへの同期的なクエリのリクエスト
StartQueryExecution
では.sync
統合パターンがサポートされているため、これを使いました。
AWS CDKの場合は、integrationPattern
にRUN_JOB
を指定するとデプロイされるステートマシンでarn:aws:states:::athena:startQueryExecution.sync
とステートに反映されるためご確認ください。
AthenaGetQueryExecutionにてクエリのステータスを確認し、Choice
でQUEUED
とRUNNING
の場合は再度待機し、それ以外の場合は完了するループを作ってもよいですが、こちらの方が簡単ですね。
今回紹介したサンプルコードでは、クエリが失敗するとステートマシンは失敗します。
4. ステートマシンにGlueデータベース・テーブルへの権限を付与する
CDKの場合、デフォルトで自動的にステートマシン向けのIAMロールを生成してくれるので大変便利なのですが、Glueデータベース・テーブルへのアクセス権限は含まれていない場合があります。これはS3バケットから取得したSQLファイルに書かれているクエリが、どのGlueデータベース・テーブルへのものかまではCDKの定義からは分からないためです。
今回のサンプルコードでは、クエリをリクエストする方のステートマシンについて、StateMachineクラスのaddToRolePolicy
で操作に十分なIAMポリシーを追加しました。この例ではかなり強めの権限を付与しているので、本番運用する場合は対象となるデータベース・テーブルに絞ることをおすすめします。
実行結果例
SQLファイルを取得し、ステートマシンを起動するステートマシン
S3バケットからのSQLファイルの取得およびAthenaへのクエリを実行するステートマシンの起動が成功しました。
Athenaへのクエリを実行するステートマシン
呼び出し側から受け取ったクエリの内容を同期的にAthenaにリクエストしました。結果もステートの出力に出ていることが分かります。
リクエストしたクエリは以下のように成功していました。
最後に
Step FunctionsだけでS3バケットにアップロードしたSQLファイルのクエリをAthenaで実行する構成例をご紹介しました。
まずは小さめの仕組みでいいので手軽にクエリの定期実行を初めてみたい場合にやりやすい構成だと思います。
一方でS3で大量のSQLファイルを管理するのは結構大変ではあると思うので、軌道に乗ってきたらCI/CDパイプラインでステートマシンの更新やSQLファイルの再配置を自動化するか、dbtなどのソフトウェアの恩恵を受けるとよいかもしれません。