AWS Step FunctionsからAmazon Athena Parameterized Queriesを実行する構成を作ってみた(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
前回のエントリでは、Amazon Athena Parameterized QueriesをCDKで作ってみました。
今回はその発展編として、Amazon Athena Parameterized QueriesをAWS Step Functionsから実行する構成をAWS CDKで作ってみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_athena, aws_stepfunctions_tasks, aws_stepfunctions, RemovalPolicy, Stack, StackProps, aws_iam, } from 'aws-cdk-lib'; import * as glue from '@aws-cdk/aws-glue-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // ソースデータ格納バケット const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', { bucketName: `data-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, }); // Athenaクエリ結果格納バケット const athenaQueryResultBucket = new aws_s3.Bucket( this, 'athenaQueryResultBucket', { bucketName: `athena-query-result-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, }, ); // データカタログ const dataCatalog = new glue.Database(this, 'dataCatalog', { databaseName: 'data_catalog', }); // データカタログテーブル const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', { tableName: 'source_data_glue_table', database: dataCatalog, bucket: sourceDataBucket, s3Prefix: 'data/', partitionKeys: [ { name: 'date', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, columns: [ { name: 'userId', type: glue.Schema.STRING, }, { name: 'count', type: glue.Schema.INTEGER, }, ], }); // データカタログテーブルへのPartition Projectionの設定 // eslint-disable-next-line @typescript-eslint/no-explicit-any (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = { 'projection.enabled': true, 'projection.date.type': 'date', 'projection.date.range': '2022/06/20,NOW', 'projection.date.format': 'yyyy/MM/dd', 'projection.date.interval': 1, 'projection.date.interval.unit': 'DAYS', 'storage.location.template': `s3://${sourceDataBucket.bucketName}/data/` + '${date}', }; // Athenaワークグループ const athenaWorkGroup = new aws_athena.CfnWorkGroup( this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`, }, }, }, ); // Prepared Queries const athenaPreparedStatement = new aws_athena.CfnPreparedStatement( this, 'athenaPreparedStatement', { statementName: 'athenaPreparedStatement', workGroup: athenaWorkGroup.name, queryStatement: ` SELECT * FROM ${dataCatalog.databaseName}.${sourceDataGlueTable.tableName} WHERE date = ? LIMIT 10`, }, ); // Athenaクエリ実行開始 const startAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaStartQueryExecution( this, 'startAthenaQueryExecutionTask', { queryString: aws_stepfunctions.JsonPath.format( `EXECUTE ${athenaPreparedStatement.statementName} USING '{}'`, aws_stepfunctions.JsonPath.stringAt('$.query_dt'), ), workGroup: athenaWorkGroup.name, resultPath: '$.startAthenaQueryExecutionTaskOutPut', }, ); // ステートマシン const stateMachine = new aws_stepfunctions.StateMachine( this, 'stateMachine', { stateMachineName: 'stateMachine', definition: startAthenaQueryExecutionTask, }, ); // Glue Data Catalogリソースへのアクセス権をState Machineに追加 stateMachine.role.addToPrincipalPolicy( new aws_iam.PolicyStatement({ actions: ['glue:Get*', 'glue:List*'], effect: aws_iam.Effect.ALLOW, resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn], }), ); // PreparedStatementを実行するためのアクセス権をState Machineに追加 stateMachine.addToRolePolicy( new aws_iam.PolicyStatement({ actions: ['athena:GetPreparedStatement'], effect: aws_iam.Effect.ALLOW, resources: [ `arn:aws:athena:${this.region}:${this.account}:workgroup/${athenaWorkGroup.name}`, ], }), ); } }
AthenaStartQueryExecution
Taskで、Query StringEXECUTE ${athenaPreparedStatement.statementName} USING '{}'
によりParameterized Queriesを実行しています。- この時、文字列型のParameter値の指定は
'{}'
のようにシングルクォートで囲む必要があります。
- この時、文字列型のParameter値の指定は
- State MachineはWorkgroupに対して
athena:GetPreparedStatement
Actionを行うので、Policyを付与します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
{ "StartAt": "startAthenaQueryExecutionTask", "States": { "startAthenaQueryExecutionTask": { "End": true, "Type": "Task", "ResultPath": "$.startAthenaQueryExecutionTaskOutPut", "Resource": "arn:aws:states:::athena:startQueryExecution", "Parameters": { "QueryString.$": "States.Format('EXECUTE athenaPreparedStatement USING {}', $.query_dt)", "ResultConfiguration": {}, "WorkGroup": "athenaWorkGroup" } } } }
動作確認
次の入力を指定してステートマシンを実行します。
{ "query_dt": "2022/06/28" }
実行が成功しました。
AthenaのQuery editorで履歴を見ると、実行が正常に完了しています。
同QueryをEditorで開くと、データ取得もちゃんと出来ていますね!
参考
- AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみた(AWS CDK v2) | DevelopersIO
- AWS Step FunctionsからS3 Bucket内のデータに対するAthenaクエリを実行して結果を取得する(AWS CDK v2) | DevelopersIO
以上