AWS Step FunctionsからAmazon AthenaでINSERT INTOクエリを実行してみた
こんにちは、CX事業本部 IoT事業部の若槻です。
Amazon Athenaでは、INSERT INTOクエリによりテーブルに対して新規レコードを追加することができます。
今回は、AWS Step FunctionsからAmazon AthenaでINSERT INTOクエリを実行してみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_athena, aws_stepfunctions, aws_stepfunctions_tasks, aws_glue, aws_iam, RemovalPolicy, Stack, StackProps, } from 'aws-cdk-lib'; import * as glue_alpha from '@aws-cdk/aws-glue-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // データソース格納バケット const dataBucket = new aws_s3.Bucket(this, 'dataBucket', { bucketName: `data-${this.account}-${this.region}`, removalPolicy: RemovalPolicy.DESTROY, }); // Athenaクエリ結果格納バケット const athenaQueryResultBucket = new aws_s3.Bucket( this, 'athenaQueryResultBucket', { bucketName: `athena-query-result-${this.account}`, removalPolicy: RemovalPolicy.DESTROY, } ); // データカタログ const dataCatalog = new glue_alpha.Database(this, 'dataCatalog', { databaseName: 'data_catalog', }); // データカタログテーブル const dataCatalogTable = new glue_alpha.Table(this, 'dataCatalogTable', { tableName: 'data_catalog_table', database: dataCatalog, bucket: dataBucket, s3Prefix: 'data/', dataFormat: glue_alpha.DataFormat.JSON, columns: [ { name: 'id', type: glue_alpha.Schema.STRING, }, { name: 'count', type: glue_alpha.Schema.INTEGER, }, ], }); // SerDeを既定のOpenX JSONからHive JSONにオーバーライド const cfnTable = dataCatalogTable.node.defaultChild as aws_glue.CfnTable; cfnTable.addPropertyOverride( 'TableInput.StorageDescriptor.SerdeInfo.SerializationLibrary', glue_alpha.SerializationLibrary.HIVE_JSON.className ); // Athenaワークグループ const athenaWorkGroup = new aws_athena.CfnWorkGroup( this, 'athenaWorkGroup', { name: 'athenaWorkGroup', workGroupConfiguration: { resultConfiguration: { outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`, }, }, recursiveDeleteOption: true, } ); // JSON ArrayをINSERT INTOクエリで指定可能な形式に変換 const getInsertedValuesTask = new aws_stepfunctions_tasks.EvaluateExpression( this, 'getInsertedValuesTask', { expression: aws_stepfunctions.JsonPath.format( `{}.map( d => "('" + d.id + "'," + d.count + ")").join(',')`, aws_stepfunctions.JsonPath.stringAt('$.input.records') ), resultPath: '$.getInsertedValuesTaskOutPut', } ); // Athenaクエリ実行 const startAthenaQueryExecutionTask = new aws_stepfunctions_tasks.AthenaStartQueryExecution( this, 'startAthenaQueryExecutionTask', { queryString: aws_stepfunctions.JsonPath.format( `INSERT INTO "${dataCatalog.databaseName}"."${dataCatalogTable.tableName}" VALUES {}`, aws_stepfunctions.JsonPath.stringAt('$.getInsertedValuesTaskOutPut') ), workGroup: athenaWorkGroup.name, integrationPattern: aws_stepfunctions.IntegrationPattern.RUN_JOB, resultPath: '$.startAthenaQueryExecutionTaskOutPut', } ); // State Machine const stateMachine = new aws_stepfunctions.StateMachine( this, 'stateMachine', { stateMachineName: 'stateMachine', definition: getInsertedValuesTask.next(startAthenaQueryExecutionTask), } ); // Glue Data Catalogリソースへのアクセス権をState Machineに追加 stateMachine.role.addToPrincipalPolicy( new aws_iam.PolicyStatement({ actions: ['glue:Get*', 'glue:List*'], effect: aws_iam.Effect.ALLOW, resources: [dataCatalog.databaseArn, dataCatalogTable.tableArn], }) ); } }
glue_alpha.Table
クラスで既定のJSON SerDeとなるOpenX JSON SerDeはINSERT INTOクエリに非対応なので、プロパティをオーバーライドしてHive JSON SerDeとしています。- INSERT対象のレコードはJSON Array形式でInputや前段のタスクのレスポンスから取得される想定です。しかしINSERTクエリではレコードを
(col1value,col2value,...)[,(col1value,col2value,...)][,...]
という形式で指定する必要があるのでaws_stepfunctions_tasks.EvaluateExpression
で形式変換を行っています。
上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。
{ "StartAt": "getInsertedValuesTask", "States": { "getInsertedValuesTask": { "Next": "startAthenaQueryExecutionTask", "Type": "Task", "ResultPath": "$.getInsertedValuesTaskOutPut", "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:ProcessStack-Evalda2d1181604e4a4586941a6abd7fe42dF-2nzgHa97p3jq", "Parameters": { "expression.$": "States.Format('{}.map( d => \"(\\'\" + d.id + \"\\',\" + d.count + \")\").join(\\',\\')', $.input.records)", "expressionAttributeValues": {} } }, "startAthenaQueryExecutionTask": { "End": true, "Type": "Task", "ResultPath": "$.startAthenaQueryExecutionTaskOutPut", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString.$": "States.Format('INSERT INTO \"data_catalog\".\"data_catalog_table\" VALUES {}', $.getInsertedValuesTaskOutPut)", "ResultConfiguration": {}, "WorkGroup": "athenaWorkGroup" } } } }
動作確認
次のInputを指定してState Machineを実行します。
{ "input": { "records": [ { "id": "001", "count": 3 }, { "id": "002", "count": 5 }, { "id": "003", "count": 1 } ] } }
実行が成功しました。
State Machineから実行されたクエリは以下になります。
INSERT INTO "data_catalog"."data_catalog_table" VALUES ('001',3),('002',5),('003',1)
テーブルに対してSELECTクエリを実行すると、データを取得することができました。レコードがちゃんとINSERTされていますね!
おわりに
AWS Step FunctionsからAmazon AthenaでINSERT INTOクエリを実行してみました。
以前にAWS Step FunctionsでJSON LinesデータをS3 BucketにPut Objectしようとして出来なかったことがありましたが、複数件のレコードをS3バケットに格納したいのであればのAthenaのINSERTクエリを使用する方法を使うべきでしたね。
参考
- [aws-glue] glue.DataFormat.TSV is broken · Issue #11239 · aws/aws-cdk
- class SerializationLibrary · AWS CDK
- AWS Step FunctionsでAthenaクエリ実行の開始および結果取得をしてみた(AWS CDK v2) | DevelopersIO
- Amazon AthenaでINSERT INTOクエリが”HIVE_UNSUPPORTED_FORMAT”エラーとなる際の対処 | DevelopersIO
- [AWS Step Functions / AWS CDK] EvaluateExpressionタスクを使って配列の操作(要素追加、結合、Map処理など)をしてみた | DevelopersIO
以上