この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
前回のエントリでは、Amazon Athena Parameterized QueriesをCDKで作ってみました。
今回はその発展編として、Amazon Athena Parameterized QueriesをAWS Step Functionsから実行する構成をAWS CDKで作ってみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_athena,
aws_stepfunctions_tasks,
aws_stepfunctions,
RemovalPolicy,
Stack,
StackProps,
aws_iam,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// ソースデータ格納バケット
const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
bucketName: `data-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
});
// Athenaクエリ結果格納バケット
const athenaQueryResultBucket = new aws_s3.Bucket(
this,
'athenaQueryResultBucket',
{
bucketName: `athena-query-result-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
},
);
// データカタログ
const dataCatalog = new glue.Database(this, 'dataCatalog', {
databaseName: 'data_catalog',
});
// データカタログテーブル
const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
tableName: 'source_data_glue_table',
database: dataCatalog,
bucket: sourceDataBucket,
s3Prefix: 'data/',
partitionKeys: [
{
name: 'date',
type: glue.Schema.STRING,
},
],
dataFormat: glue.DataFormat.JSON,
columns: [
{
name: 'userId',
type: glue.Schema.STRING,
},
{
name: 'count',
type: glue.Schema.INTEGER,
},
],
});
// データカタログテーブルへのPartition Projectionの設定
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
'projection.enabled': true,
'projection.date.type': 'date',
'projection.date.range': '2022/06/20,NOW',
'projection.date.format': 'yyyy/MM/dd',
'projection.date.interval': 1,
'projection.date.interval.unit': 'DAYS',
'storage.location.template':
`s3://${sourceDataBucket.bucketName}/data/` + '${date}',
};
// Athenaワークグループ
const athenaWorkGroup = new aws_athena.CfnWorkGroup(
this,
'athenaWorkGroup',
{
name: 'athenaWorkGroup',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
},
},
},
);
// Prepared Queries
const athenaPreparedStatement = new aws_athena.CfnPreparedStatement(
this,
'athenaPreparedStatement',
{
statementName: 'athenaPreparedStatement',
workGroup: athenaWorkGroup.name,
queryStatement: `
SELECT *
FROM ${dataCatalog.databaseName}.${sourceDataGlueTable.tableName}
WHERE date = ?
LIMIT 10`,
},
);
// Athenaクエリ実行開始
const startAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaStartQueryExecution(
this,
'startAthenaQueryExecutionTask',
{
queryString: aws_stepfunctions.JsonPath.format(
`EXECUTE ${athenaPreparedStatement.statementName} USING '{}'`,
aws_stepfunctions.JsonPath.stringAt('$.query_dt'),
),
workGroup: athenaWorkGroup.name,
resultPath: '$.startAthenaQueryExecutionTaskOutPut',
},
);
// ステートマシン
const stateMachine = new aws_stepfunctions.StateMachine(
this,
'stateMachine',
{
stateMachineName: 'stateMachine',
definition: startAthenaQueryExecutionTask,
},
);
// Glue Data Catalogリソースへのアクセス権をState Machineに追加
stateMachine.role.addToPrincipalPolicy(
new aws_iam.PolicyStatement({
actions: ['glue:Get*', 'glue:List*'],
effect: aws_iam.Effect.ALLOW,
resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn],
}),
);
// PreparedStatementを実行するためのアクセス権をState Machineに追加
stateMachine.addToRolePolicy(
new aws_iam.PolicyStatement({
actions: ['athena:GetPreparedStatement'],
effect: aws_iam.Effect.ALLOW,
resources: [
`arn:aws:athena:${this.region}:${this.account}:workgroup/${athenaWorkGroup.name}`,
],
}),
);
}
}
AthenaStartQueryExecution
Taskで、Query StringEXECUTE ${athenaPreparedStatement.statementName} USING '{}'
によりParameterized Queriesを実行しています。- この時、文字列型のParameter値の指定は
'{}'
のようにシングルクォートで囲む必要があります。
- この時、文字列型のParameter値の指定は
- State MachineはWorkgroupに対して
athena:GetPreparedStatement
Actionを行うので、Policyを付与します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
State Machine Definition
{
"StartAt": "startAthenaQueryExecutionTask",
"States": {
"startAthenaQueryExecutionTask": {
"End": true,
"Type": "Task",
"ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
"Resource": "arn:aws:states:::athena:startQueryExecution",
"Parameters": {
"QueryString.$": "States.Format('EXECUTE athenaPreparedStatement USING {}', $.query_dt)",
"ResultConfiguration": {},
"WorkGroup": "athenaWorkGroup"
}
}
}
}
動作確認
次の入力を指定してステートマシンを実行します。
Input
{
"query_dt": "2022/06/28"
}
実行が成功しました。
AthenaのQuery editorで履歴を見ると、実行が正常に完了しています。
同QueryをEditorで開くと、データ取得もちゃんと出来ていますね!
参考
- AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみた(AWS CDK v2) | DevelopersIO
- AWS Step FunctionsからS3 Bucket内のデータに対するAthenaクエリを実行して結果を取得する(AWS CDK v2) | DevelopersIO
以上