こんにちは、CX事業本部 IoT事業部の若槻です。
DynamoDBのTransactWriteItems APIを実行する際は、1回のAPIコール毎に25リクエストの上限があります。
この上限はStep FunctionsからTransactWriteItems APIをコールする場合ももちろん適用されるので、実装する際には予め25個ずつに分割したデータをInputするようにするか、Step Functions内でLambdaの処理を挟んで分割させる等の実装上の工夫が必要でした。(なので以前にこんなブログを書いていました。)
しかしそんな折にStep Functionsの新しい組み込み関数としてStates.ArrayPartitionが追加されました。
States.ArrayPartition
を使えば次のように配列の要素を指定した個数毎のチャンクの配列に分割することができます。これ、前述のTransactWriteItems APIをコールするための25個ずつの分割にそのまま利用できそうですね。
// Input
{"inputArray": [1,2,3,4,5,6,7,8,9] }
// Functions
"inputArray.$": "States.ArrayPartition($.inputArray,4)"
// Output
{"inputArray": [ [1,2,3,4], [5,6,7,8], [9]] }
そこで今回は、AWS Step Functionsの組み込み関数States.ArrayPartition
を使ってDynamoDBのTransactWriteItems APIでリクエストするオブジェクト数を上限数25件毎に分割する実装(AWS CDK)をしてみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
lib/aws-cdk-app-stack.ts
import {
aws_stepfunctions,
aws_dynamodb,
Stack,
StackProps,
RemovalPolicy,
aws_stepfunctions_tasks,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class AwsCdkAppStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// DynamoDBテーブル
const dataTable = new aws_dynamodb.Table(this, 'dataTable', {
tableName: 'dataTable',
partitionKey: {
name: 'id',
type: aws_dynamodb.AttributeType.STRING,
},
billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: RemovalPolicy.DESTROY,
});
// States.ArrayPartitionでトランザクション処理対象のアイテムの配列を25件毎に分割するTask
const itemsPartitionPass = new aws_stepfunctions.Pass(
this,
'itemsPartitionPass',
{
parameters: {
'partitionedItems.$': 'States.ArrayPartition($.inputItems,25)',
},
}
);
// TransactWriteItemsリクエスト作成Pass
const getTransactWriteItemsRequestTask = new aws_stepfunctions.Pass(
this,
'getTransactWriteItemsRequestTask',
{
parameters: {
Put: {
TableName: dataTable.tableName,
Item: {
id: { S: aws_stepfunctions.JsonPath.stringAt('$.id') },
},
},
},
}
);
// TransactWriteItems作成Map
const getTransactWriteItemsMap = new aws_stepfunctions.Map(
this,
'getTransactWriteItemsMap',
{
itemsPath: aws_stepfunctions.JsonPath.stringAt('$'),
}
).iterator(getTransactWriteItemsRequestTask);
// TransactWriteItems実行Task
const putDataTask = new aws_stepfunctions_tasks.CallAwsService(
this,
'putDataTask',
{
service: 'dynamodb',
action: 'transactWriteItems',
parameters: {
TransactItems: aws_stepfunctions.JsonPath.stringAt('$'),
},
iamResources: [dataTable.tableArn],
iamAction: 'dynamodb:*',
}
);
// 分割されたPartition毎にTransactWriteItemsを実行するMap
const transactWriteItemsMap = new aws_stepfunctions.Map(
this,
'transactWriteItemsMap',
{
itemsPath: aws_stepfunctions.JsonPath.stringAt('$.partitionedItems'),
}
).iterator(getTransactWriteItemsMap.next(putDataTask));
// State Machine
new aws_stepfunctions.StateMachine(this, 'myStateMachine', {
stateMachineName: 'myStateMachine',
definition: itemsPartitionPass.next(transactWriteItemsMap),
});
}
}
上記をCDK Deployしてスタックをデプロイします。これにより次の定義のState Machineが作成されます。
Definition
{
"StartAt": "itemsPartitionPass",
"States": {
"itemsPartitionPass": {
"Type": "Pass",
"Parameters": {
"partitionedItems.$": "States.ArrayPartition($.inputItems,25)"
},
"Next": "transactWriteItemsMap"
},
"transactWriteItemsMap": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "getTransactWriteItemsMap",
"States": {
"getTransactWriteItemsMap": {
"Type": "Map",
"Next": "putDataTask",
"Iterator": {
"StartAt": "getTransactWriteItemsRequestTask",
"States": {
"getTransactWriteItemsRequestTask": {
"Type": "Pass",
"Parameters": {
"Put": {
"TableName": "dataTable",
"Item": {
"id": {
"S.$": "$.id"
}
}
}
},
"End": true
}
}
},
"ItemsPath": "$"
},
"putDataTask": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:transactWriteItems",
"Parameters": {
"TransactItems.$": "$"
}
}
}
},
"ItemsPath": "$.partitionedItems"
}
}
}
State Machine Graphは次のようになります。
動作確認
次の30個のアイテムをInputに指定してState Machineを実行してみます。
Input
{
"inputItems": [
{ "id": "001" },
{ "id": "002" },
{ "id": "003" },
{ "id": "004" },
{ "id": "005" },
{ "id": "006" },
{ "id": "007" },
{ "id": "008" },
{ "id": "009" },
{ "id": "010" },
{ "id": "011" },
{ "id": "012" },
{ "id": "013" },
{ "id": "014" },
{ "id": "015" },
{ "id": "016" },
{ "id": "017" },
{ "id": "018" },
{ "id": "019" },
{ "id": "020" },
{ "id": "021" },
{ "id": "022" },
{ "id": "023" },
{ "id": "024" },
{ "id": "025" },
{ "id": "026" },
{ "id": "027" },
{ "id": "028" },
{ "id": "029" },
{ "id": "030" }
]
}
すると実行が成功しました。itemsPartitionPass
のOutputを見ると、アイテムがちゃんと25個ずつに分割されています。
itemsPartitionPass Output
{
"partitionedItems": [
[
{
"id": "001"
},
{
"id": "002"
},
{
"id": "003"
},
{
"id": "004"
},
{
"id": "005"
},
{
"id": "006"
},
{
"id": "007"
},
{
"id": "008"
},
{
"id": "009"
},
{
"id": "010"
},
{
"id": "011"
},
{
"id": "012"
},
{
"id": "013"
},
{
"id": "014"
},
{
"id": "015"
},
{
"id": "016"
},
{
"id": "017"
},
{
"id": "018"
},
{
"id": "019"
},
{
"id": "020"
},
{
"id": "021"
},
{
"id": "022"
},
{
"id": "023"
},
{
"id": "024"
},
{
"id": "025"
}
],
[
{
"id": "026"
},
{
"id": "027"
},
{
"id": "028"
},
{
"id": "029"
},
{
"id": "030"
}
]
]
}
putDataTask
のInputを見ると、transactWriteItemsMap - iteration #0
ではID値が001
から025
までの25個のアイテムのPutItemリクエストとなっています。
transactWriteItemsMap - iteration #1
ではID値が026
から030
までの5個のアイテムのPutItemリクエストとなっています。
テーブルをスキャンするとちゃんと30個のアイテムすべてがPutされていますね!
$ aws dynamodb scan --table-name dataTable --query "sort_by(Items, &id.S)[*].id.S"
[
"001",
"002",
"003",
"004",
"005",
"006",
"007",
"008",
"009",
"010",
"011",
"012",
"013",
"014",
"015",
"016",
"017",
"018",
"019",
"020",
"021",
"022",
"023",
"024",
"025",
"026",
"027",
"028",
"029",
"030"
]
参考
以上