こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsからS3 Bucket内のデータに対するAmazon Athenaのクエリを実行して結果を取得してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_athena,
aws_stepfunctions,
aws_stepfunctions_tasks,
aws_iam,
RemovalPolicy,
Duration,
Stack,
StackProps,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// ソースデータ格納バケット
const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
bucketName: `data-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
});
// Athenaクエリ結果格納バケット
const athenaQueryResultBucket = new aws_s3.Bucket(
this,
'athenaQueryResultBucket',
{
bucketName: `athena-query-result-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
},
);
// データカタログ
const dataCatalog = new glue.Database(this, 'dataCatalog', {
databaseName: 'data_catalog',
});
// データカタログテーブル
const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
tableName: 'source_data_glue_table',
database: dataCatalog,
bucket: sourceDataBucket,
s3Prefix: 'data/',
partitionKeys: [
{
name: 'date',
type: glue.Schema.STRING,
},
],
dataFormat: glue.DataFormat.JSON,
columns: [
{
name: 'userId',
type: glue.Schema.STRING,
},
{
name: 'count',
type: glue.Schema.INTEGER,
},
],
});
// データカタログテーブルへのPartition Projectionの設定
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
'projection.enabled': true,
'projection.date.type': 'date',
'projection.date.range': '2022/06/28,NOW',
'projection.date.format': 'yyyy/MM/dd',
'projection.date.interval': 1,
'projection.date.interval.unit': 'DAYS',
'storage.location.template':
`s3://${sourceDataBucket.bucketName}/data/` + '${date}',
};
// Athenaワークグループ
const athenaWorkGroup = new aws_athena.CfnWorkGroup(
this,
'athenaWorkGroup',
{
name: 'athenaWorkGroup',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
},
},
},
);
// Athenaクエリ実行開始タスク
const startAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaStartQueryExecution(
this,
'startAthenaQueryExecutionTask',
{
queryString: aws_stepfunctions.JsonPath.stringAt(
'$.executionInput.queryString',
),
workGroup: athenaWorkGroup.name,
resultPath: '$.startAthenaQueryExecutionTaskOutPut',
},
);
// Athenaクエリ実行取得タスク
const getAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaGetQueryExecution(
this,
'getAthenaQueryExecutionTask',
{
queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
'$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
),
resultSelector: {
queryState: aws_stepfunctions.JsonPath.stringAt(
'$.QueryExecution.Status.State',
),
},
resultPath: '$.getAthenaQueryExecutionTaskOutPut',
},
);
// 10秒待機
const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', {
time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)),
});
// Athenaクエリ結果取得
const getAthenaQueryResultsTask =
new aws_stepfunctions_tasks.AthenaGetQueryResults(
this,
'getAthenaQueryResultsTask',
{
queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
'$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
),
resultSelector: {
resultColumnInfo: aws_stepfunctions.JsonPath.stringAt(
'$.ResultSet.ResultSetMetadata.ColumnInfo',
),
resultRaws: aws_stepfunctions.JsonPath.stringAt(
'$.ResultSet.Rows[1:(@.length-1)]',
),
},
resultPath: '$.getAthenaQueryResultsTaskOutPut',
},
);
// Athenaクエリ実行結果をSimpleなJsonに変換
const convertResultJsonTask =
new aws_stepfunctions_tasks.EvaluateExpression(
this,
'convertResultJsonTask',
{
expression: aws_stepfunctions.JsonPath.format(
'const cols = {}; const rows = {}; {};',
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryResultsTaskOutPut.resultColumnInfo',
),
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryResultsTaskOutPut.resultRaws',
),
'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col["Name"]] = isNaN(val) ? val : Number(val); return j; }); return j; })',
),
},
);
// Athenaクエリが完了するまで待機
const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice(
this,
'athenaQueryStateChoiceTask',
);
athenaQueryStateChoiceTask.when(
aws_stepfunctions.Condition.or(
aws_stepfunctions.Condition.stringEquals(
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryExecutionTaskOutPut.queryState',
),
'QUEUED',
),
aws_stepfunctions.Condition.stringEquals(
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryExecutionTaskOutPut.queryState',
),
'RUNNING',
),
),
wait10seconds,
);
athenaQueryStateChoiceTask.otherwise(
getAthenaQueryResultsTask.next(convertResultJsonTask),
);
// State Machine
const stateMachine = new aws_stepfunctions.StateMachine(
this,
'stateMachine',
{
stateMachineName: 'stateMachine',
definition: startAthenaQueryExecutionTask
.next(wait10seconds)
.next(getAthenaQueryExecutionTask)
.next(athenaQueryStateChoiceTask),
},
);
// Glue Data Catalogリソースへのアクセス権をState Machineに追加
stateMachine.role.addToPrincipalPolicy(
new aws_iam.PolicyStatement({
actions: ['glue:Get*', 'glue:List*'],
effect: aws_iam.Effect.ALLOW,
resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn],
}),
);
}
}
AthenaStartQueryExecution
やAthenaGetQueryExecution
Classによる実装では、DefaultのGlue Data Catalog(TableおよびDatabaase)のアクセス権限しか付与されません。よって今回用のGlue Data Catalogリソースの権限を明示的に付与しています。getAthenaQueryResultsTask
タスクではresultSelector
で$.ResultSet.Rows[1:(@.length-1)]
とすることにより、取得結果データのヘッダー行を削除しています。convertResultJsonTask
タスクで使用するカラム情報はresultColumnInfo
を使用します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
Definition
{
"StartAt": "startAthenaQueryExecutionTask",
"States": {
"startAthenaQueryExecutionTask": {
"Next": "wait10seconds",
"Type": "Task",
"ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
"Resource": "arn:aws:states:::athena:startQueryExecution",
"Parameters": {
"QueryString.$": "$.executionInput.queryString",
"ResultConfiguration": {},
"WorkGroup": "athenaWorkGroup"
}
},
"wait10seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "getAthenaQueryExecutionTask"
},
"athenaQueryStateChoiceTask": {
"Type": "Choice",
"Choices": [
{
"Or": [
{
"Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
"StringEquals": "QUEUED"
},
{
"Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
"StringEquals": "RUNNING"
}
],
"Next": "wait10seconds"
}
],
"Default": "getAthenaQueryResultsTask"
},
"getAthenaQueryExecutionTask": {
"Next": "athenaQueryStateChoiceTask",
"Type": "Task",
"ResultPath": "$.getAthenaQueryExecutionTaskOutPut",
"ResultSelector": {
"queryState.$": "$.QueryExecution.Status.State"
},
"Resource": "arn:aws:states:::athena:getQueryExecution",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
}
},
"getAthenaQueryResultsTask": {
"Next": "convertResultJsonTask",
"Type": "Task",
"ResultPath": "$.getAthenaQueryResultsTaskOutPut",
"ResultSelector": {
"resultColumnInfo.$": "$.ResultSet.ResultSetMetadata.ColumnInfo",
"resultRaws.$": "$.ResultSet.Rows[1:(@.length-1)]"
},
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
}
},
"convertResultJsonTask": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:ProcessStack-Evalda2d1181604e4a4586941a6abd7fe42dF-YCTnZYU56eCr",
"Parameters": {
"expression.$": "States.Format('const cols = {}; const rows = {}; {};', $.getAthenaQueryResultsTaskOutPut.resultColumnInfo, $.getAthenaQueryResultsTaskOutPut.resultRaws, 'rows.map(row => { let j = {}; cols.forEach((col,i) => { const val = row.Data[i].VarCharValue; j[col[\"Name\"]] = isNaN(val) ? val : Number(val); return j; }); return j; })')",
"expressionAttributeValues": {}
}
}
}
}
動作確認
次のクエリをState Machineから実行してみます。
SELECT *
FROM "data_catalog"."source_data_glue_table"
WHERE date = '2022/06/29'
同クエリをAthenaのコンソールから実行した様子です。これと同じ結果が取得できることを期待します。
Inputを次のように指定してステートマシンを実行します。
Input
{
"executionInput": {
"queryString": "SELECT * FROM \"data_catalog\".\"source_data_glue_table\" WHERE date = '2022/06/29'"
}
}
実行が成功しました。
State MachineのOutputで期待したデータが取得できています!
Output
[
{
"userid": "u001",
"count": 14,
"date": "2022/06/29"
},
{
"userid": "u002",
"count": 10,
"date": "2022/06/29"
},
{
"userid": "u003",
"count": 12,
"date": "2022/06/29"
}
]
参考
以上