AWS Step FunctionsからS3 Bucket内のデータに対するAthenaクエリを実行して結果を取得する(AWS CDK v2)
こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsからS3 Bucket内のデータに対するAmazon Athenaのクエリを実行して結果を取得してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_athena, aws_stepfunctions, aws_stepfunctions_tasks, aws_iam, RemovalPolicy, Duration, Stack, StackProps, } from 'aws-cdk-lib'; import * as glue from '@aws-cdk/aws-glue-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // ソースデータ格納バケット const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', { bucketName: `data-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, }); // Athenaクエリ結果格納バケット const athenaQueryResultBucket = new aws_s3.Bucket( this, 'athenaQueryResultBucket', { bucketName: `athena-query-result-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, }, ); // データカタログ const dataCatalog = new glue.Database(this, 'dataCatalog', { databaseName: 'data_catalog', }); // データカタログテーブル const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', { tableName: 'source_data_glue_table', database: dataCatalog, bucket: sourceDataBucket, s3Prefix: 'data/', partitionKeys: [ { name: 'date', type: glue.Schema.STRING, }, ], dataFormat: glue.DataFormat.JSON, columns: [ { name: 'userId', type: glue.Schema.STRING, }, { name: 'count', type: glue.Schema.INTEGER, }, ], }); // データカタログテーブルへのPartition Projectionの設定 // eslint-disable-next-line @typescript-eslint/no-explicit-any (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = { 'projection.enabled': true, 'projection.date.type': 'date', 'projection.date.range': '2022/06/28,NOW', 'projection.date.format': 'yyyy/MM/dd', 'projection.date.interval': 1, 'projection.date.interval.unit': 'DAYS', 'storage.location.template': `s3://${sourceDataBucket.bucketName}/data/` + '${date}', }; // Athenaワークグループ const athenaWorkGroup = new aws_athena.CfnWorkGroup( this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`, }, }, }, ); // Athenaクエリ実行開始タスク const startAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaStartQueryExecution( this, 'startAthenaQueryExecutionTask', { queryString: aws_stepfunctions.JsonPath.stringAt( '$.executionInput.queryString', ), workGroup: athenaWorkGroup.name, resultPath: '$.startAthenaQueryExecutionTaskOutPut', }, ); // Athenaクエリ実行取得タスク const getAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaGetQueryExecution( this, 'getAthenaQueryExecutionTask', { queryExecutionId: aws_stepfunctions.JsonPath.stringAt( '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId', ), resultSelector: { queryState: aws_stepfunctions.JsonPath.stringAt( '$.QueryExecution.Status.State', ), }, resultPath: '$.getAthenaQueryExecutionTaskOutPut', }, ); // 10秒待機 const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', { time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)), }); // Athenaクエリ結果取得 const getAthenaQueryResultsTask = new aws_stepfunctions_tasks.AthenaGetQueryResults( this, 'getAthenaQueryResultsTask', { queryExecutionId: aws_stepfunctions.JsonPath.stringAt( '$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId', ), resultSelector: { resultColumnInfo: aws_stepfunctions.JsonPath.stringAt( '$.ResultSet.ResultSetMetadata.ColumnInfo', ), resultRaws: aws_stepfunctions.JsonPath.stringAt( '$.ResultSet.Rows[1:(@.length-1)]', ), }, resultPath: '$.getAthenaQueryResultsTaskOutPut', }, ); // Athenaクエリ実行結果をSimpleなJsonに変換 const convertResultJsonTask = new aws_stepfunctions_tasks.EvaluateExpression( this, 'convertResultJsonTask', { expression: aws_stepfunctions.JsonPath.format( 'const cols = {}; const rows = {}; {};', aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryResultsTaskOutPut.resultColumnInfo', ), aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryResultsTaskOutPut.resultRaws', ), 'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col["Name"]] = isNaN(val) ? val : Number(val); return j; }); return j; })', ), }, ); // Athenaクエリが完了するまで待機 const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice( this, 'athenaQueryStateChoiceTask', ); athenaQueryStateChoiceTask.when( aws_stepfunctions.Condition.or( aws_stepfunctions.Condition.stringEquals( aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryExecutionTaskOutPut.queryState', ), 'QUEUED', ), aws_stepfunctions.Condition.stringEquals( aws_stepfunctions.JsonPath.stringAt( '$.getAthenaQueryExecutionTaskOutPut.queryState', ), 'RUNNING', ), ), wait10seconds, ); athenaQueryStateChoiceTask.otherwise( getAthenaQueryResultsTask.next(convertResultJsonTask), ); // State Machine const stateMachine = new aws_stepfunctions.StateMachine( this, 'stateMachine', { stateMachineName: 'stateMachine', definition: startAthenaQueryExecutionTask .next(wait10seconds) .next(getAthenaQueryExecutionTask) .next(athenaQueryStateChoiceTask), }, ); // Glue Data Catalogリソースへのアクセス権をState Machineに追加 stateMachine.role.addToPrincipalPolicy( new aws_iam.PolicyStatement({ actions: ['glue:Get*', 'glue:List*'], effect: aws_iam.Effect.ALLOW, resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn], }), ); } }
AthenaStartQueryExecution
やAthenaGetQueryExecution
Classによる実装では、DefaultのGlue Data Catalog(TableおよびDatabaase)のアクセス権限しか付与されません。よって今回用のGlue Data Catalogリソースの権限を明示的に付与しています。getAthenaQueryResultsTask
タスクではresultSelector
で$.ResultSet.Rows[1:(@.length-1)]
とすることにより、取得結果データのヘッダー行を削除しています。convertResultJsonTask
タスクで使用するカラム情報はresultColumnInfo
を使用します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
{ "StartAt": "startAthenaQueryExecutionTask", "States": { "startAthenaQueryExecutionTask": { "Next": "wait10seconds", "Type": "Task", "ResultPath": "$.startAthenaQueryExecutionTaskOutPut", "Resource": "arn:aws:states:::athena:startQueryExecution", "Parameters": { "QueryString.$": "$.executionInput.queryString", "ResultConfiguration": {}, "WorkGroup": "athenaWorkGroup" } }, "wait10seconds": { "Type": "Wait", "Seconds": 10, "Next": "getAthenaQueryExecutionTask" }, "athenaQueryStateChoiceTask": { "Type": "Choice", "Choices": [ { "Or": [ { "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState", "StringEquals": "QUEUED" }, { "Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState", "StringEquals": "RUNNING" } ], "Next": "wait10seconds" } ], "Default": "getAthenaQueryResultsTask" }, "getAthenaQueryExecutionTask": { "Next": "athenaQueryStateChoiceTask", "Type": "Task", "ResultPath": "$.getAthenaQueryExecutionTaskOutPut", "ResultSelector": { "queryState.$": "$.QueryExecution.Status.State" }, "Resource": "arn:aws:states:::athena:getQueryExecution", "Parameters": { "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId" } }, "getAthenaQueryResultsTask": { "Next": "convertResultJsonTask", "Type": "Task", "ResultPath": "$.getAthenaQueryResultsTaskOutPut", "ResultSelector": { "resultColumnInfo.$": "$.ResultSet.ResultSetMetadata.ColumnInfo", "resultRaws.$": "$.ResultSet.Rows[1:(@.length-1)]" }, "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId" } }, "convertResultJsonTask": { "End": true, "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:ProcessStack-Evalda2d1181604e4a4586941a6abd7fe42dF-YCTnZYU56eCr", "Parameters": { "expression.$": "States.Format('const cols = {}; const rows = {}; {};', $.getAthenaQueryResultsTaskOutPut.resultColumnInfo, $.getAthenaQueryResultsTaskOutPut.resultRaws, 'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col[\"Name\"]] = isNaN(val) ? val : Number(val); return j; }); return j; })')", "expressionAttributeValues": {} } } } }
動作確認
次のクエリをState Machineから実行してみます。
SELECT * FROM "data_catalog"."source_data_glue_table" WHERE date = '2022/06/29'
同クエリをAthenaのコンソールから実行した様子です。これと同じ結果が取得できることを期待します。
Inputを次のように指定してステートマシンを実行します。
{ "executionInput": { "queryString": "SELECT * FROM \"data_catalog\".\"source_data_glue_table\" WHERE date = '2022/06/29'" } }
実行が成功しました。
State MachineのOutputで期待したデータが取得できています!
[ { "userid": "u001", "count": 14, "date": "2022/06/29" }, { "userid": "u002", "count": 10, "date": "2022/06/29" }, { "userid": "u003", "count": 12, "date": "2022/06/29" } ]
参考
以上