この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
Amazon Athenaを使用すると、SQLライクなクエリ文字列でS3 Bucketなどに格納されているデータを直接分析することが可能です。
今回は、AWS Step FunctionsからAthenaクエリ実行の開始および結果取得をしてみました。
やってみた
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/process-stack.ts
import { Construct } from 'constructs';
import {
aws_s3,
aws_stepfunctions,
aws_stepfunctions_tasks,
aws_athena,
RemovalPolicy,
Duration,
Stack,
StackProps,
} from 'aws-cdk-lib';
export class ProcessStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Athenaクエリ結果格納バケット
const athenaQueryResultBucket = new aws_s3.Bucket(
this,
'athenaQueryResultBucket',
{
bucketName: `athena-query-result-${this.account}`,
removalPolicy: RemovalPolicy.DESTROY,
},
);
// Athenaワークグループ
const athenaWorkGroup = new aws_athena.CfnWorkGroup(
this,
'athenaWorkGroup',
{
name: 'athenaWorkGroup',
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
},
},
},
);
// Athenaクエリ実行開始
const startAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaStartQueryExecution(
this,
'startAthenaQueryExecutionTask',
{
queryString: aws_stepfunctions.JsonPath.stringAt(
'$.executionInput.queryString',
),
workGroup: athenaWorkGroup.name,
resultPath: '$.startAthenaQueryExecutionTaskOutPut',
},
);
// Athenaクエリ実行取得
const getAthenaQueryExecutionTask =
new aws_stepfunctions_tasks.AthenaGetQueryExecution(
this,
'getAthenaQueryExecutionTask',
{
queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
'$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
),
resultSelector: {
queryState: aws_stepfunctions.JsonPath.stringAt(
'$.QueryExecution.Status.State',
),
},
resultPath: '$.getAthenaQueryExecutionTaskOutPut',
},
);
// 10秒待機
const wait10seconds = new aws_stepfunctions.Wait(this, 'wait10seconds', {
time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10)),
});
// Athenaクエリ結果取得
const getAthenaQueryResultsTask =
new aws_stepfunctions_tasks.AthenaGetQueryResults(
this,
'getAthenaQueryResultsTask',
{
queryExecutionId: aws_stepfunctions.JsonPath.stringAt(
'$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId',
),
},
);
// Athenaクエリが完了するまで待機
const athenaQueryStateChoiceTask = new aws_stepfunctions.Choice(
this,
'athenaQueryStateChoiceTask',
);
athenaQueryStateChoiceTask.when(
aws_stepfunctions.Condition.or(
aws_stepfunctions.Condition.stringEquals(
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryExecutionTaskOutPut.queryState',
),
'QUEUED',
),
aws_stepfunctions.Condition.stringEquals(
aws_stepfunctions.JsonPath.stringAt(
'$.getAthenaQueryExecutionTaskOutPut.queryState',
),
'RUNNING',
),
),
wait10seconds,
);
athenaQueryStateChoiceTask.otherwise(getAthenaQueryResultsTask);
// State Machine
new aws_stepfunctions.StateMachine(this, 'stateMachine', {
stateMachineName: 'stateMachine',
definition: startAthenaQueryExecutionTask
.next(wait10seconds)
.next(getAthenaQueryExecutionTask)
.next(athenaQueryStateChoiceTask),
});
}
}
- Athenaクエリの実行は非同期なので実行開始後に10秒待機し、実行が完了するまで10秒待機および実行結果の確認を繰り返します。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
Definition
{
"StartAt": "startAthenaQueryExecutionTask",
"States": {
"startAthenaQueryExecutionTask": {
"Next": "wait10seconds",
"Type": "Task",
"ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
"Resource": "arn:aws:states:::athena:startQueryExecution",
"Parameters": {
"QueryString.$": "$.executionInput.queryString",
"ResultConfiguration": {},
"WorkGroup": "athenaWorkGroup"
}
},
"wait10seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "getAthenaQueryExecutionTask"
},
"athenaQueryStateChoiceTask": {
"Type": "Choice",
"Choices": [
{
"Or": [
{
"Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
"StringEquals": "QUEUED"
},
{
"Variable": "$.getAthenaQueryExecutionTaskOutPut.queryState",
"StringEquals": "RUNNING"
}
],
"Next": "wait10seconds"
}
],
"Default": "getAthenaQueryResultsTask"
},
"getAthenaQueryExecutionTask": {
"Next": "athenaQueryStateChoiceTask",
"Type": "Task",
"ResultPath": "$.getAthenaQueryExecutionTaskOutPut",
"ResultSelector": {
"queryState.$": "$.QueryExecution.Status.State"
},
"Resource": "arn:aws:states:::athena:getQueryExecution",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
}
},
"getAthenaQueryResultsTask": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:states:::athena:getQueryResults",
"Parameters": {
"QueryExecutionId.$": "$.startAthenaQueryExecutionTaskOutPut.QueryExecutionId"
}
}
}
}
動作確認
次のクエリで確認してみます。
SELECT ARRAY [1,2,3,4] AS items
クエリをコンソールで実行した様子です。結果でレコードが1つ取得できています。
次の入力を指定してステートマシンを実行します。
Input
{
"executionInput": {
"queryString": "SELECT ARRAY [1,2,3,4] AS items"
}
}
すると実行が成功しました。
getAthenaQueryResultsTask
のOutputを見ると、クエリ結果が取得できています!
getAthenaQueryResultsTask Output
{
"ResultSet": {
"ResultSetMetadata": {
"ColumnInfo": [
{
"CaseSensitive": false,
"CatalogName": "hive",
"Label": "items",
"Name": "items",
"Nullable": "UNKNOWN",
"Precision": 0,
"Scale": 0,
"SchemaName": "",
"TableName": "",
"Type": "array"
}
]
},
"Rows": [
{
"Data": [
{
"VarCharValue": "items"
}
]
},
{
"Data": [
{
"VarCharValue": "[1, 2, 3, 4]"
}
]
}
]
},
"UpdateCount": 0
}
おわりに
AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみました。難なく実装ができたので流石はStep Functionsと言ったところです。
ただしathena:getQueryResults APIにより取得した実行結果のJson Objectはそのままでは扱いにくい形式なので、取得後にEvaluateExpressionタスクなどで上手いこと変換したいですね。
参考
- Querying arrays - Amazon Athena
- athena-Type-QueryExecutionStatus-State - QueryExecutionStatus - Athena
- athena-Type-QueryExecutionStatus-State - QueryExecutionStatus - Athena
- class AthenaGetQueryExecution (construct) · AWS CDK
以上