AWS Step Functionsの組み込み関数States.ArrayPartitionを使ってDynamoDBのTransactWriteItems APIでリクエストするオブジェクト数を上限数25件毎に分割してみた

AWS Step Functionsの組み込み関数States.ArrayPartitionを使ってDynamoDBのTransactWriteItems APIでリクエストするオブジェクト数を上限数25件毎に分割してみた

Clock Icon2022.09.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

DynamoDBのTransactWriteItems APIを実行する際は、1回のAPIコール毎に25リクエストの上限があります。

この上限はStep FunctionsからTransactWriteItems APIをコールする場合ももちろん適用されるので、実装する際には予め25個ずつに分割したデータをInputするようにするか、Step Functions内でLambdaの処理を挟んで分割させる等の実装上の工夫が必要でした。(なので以前にこんなブログを書いていました。)

しかしそんな折にStep Functionsの新しい組み込み関数としてStates.ArrayPartitionが追加されました。

States.ArrayPartitionを使えば次のように配列の要素を指定した個数毎のチャンクの配列に分割することができます。これ、前述のTransactWriteItems APIをコールするための25個ずつの分割にそのまま利用できそうですね。

// Input
{"inputArray": [1,2,3,4,5,6,7,8,9] }

// Functions
"inputArray.$": "States.ArrayPartition($.inputArray,4)"

// Output
{"inputArray": [ [1,2,3,4], [5,6,7,8], [9]] }

そこで今回は、AWS Step Functionsの組み込み関数States.ArrayPartitionを使ってDynamoDBのTransactWriteItems APIでリクエストするオブジェクト数を上限数25件毎に分割する実装(AWS CDK)をしてみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

import {
  aws_stepfunctions,
  aws_dynamodb,
  Stack,
  StackProps,
  RemovalPolicy,
  aws_stepfunctions_tasks,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class AwsCdkAppStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // DynamoDBテーブル
    const dataTable = new aws_dynamodb.Table(this, 'dataTable', {
      tableName: 'dataTable',
      partitionKey: {
        name: 'id',
        type: aws_dynamodb.AttributeType.STRING,
      },
      billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // States.ArrayPartitionでトランザクション処理対象のアイテムの配列を25件毎に分割するTask
    const itemsPartitionPass = new aws_stepfunctions.Pass(
      this,
      'itemsPartitionPass',
      {
        parameters: {
          'partitionedItems.$': 'States.ArrayPartition($.inputItems,25)',
        },
      }
    );

    // TransactWriteItemsリクエスト作成Pass
    const getTransactWriteItemsRequestTask = new aws_stepfunctions.Pass(
      this,
      'getTransactWriteItemsRequestTask',
      {
        parameters: {
          Put: {
            TableName: dataTable.tableName,
            Item: {
              id: { S: aws_stepfunctions.JsonPath.stringAt('$.id') },
            },
          },
        },
      }
    );

    // TransactWriteItems作成Map
    const getTransactWriteItemsMap = new aws_stepfunctions.Map(
      this,
      'getTransactWriteItemsMap',
      {
        itemsPath: aws_stepfunctions.JsonPath.stringAt('$'),
      }
    ).iterator(getTransactWriteItemsRequestTask);

    // TransactWriteItems実行Task
    const putDataTask = new aws_stepfunctions_tasks.CallAwsService(
      this,
      'putDataTask',
      {
        service: 'dynamodb',
        action: 'transactWriteItems',
        parameters: {
          TransactItems: aws_stepfunctions.JsonPath.stringAt('$'),
        },
        iamResources: [dataTable.tableArn],
        iamAction: 'dynamodb:*',
      }
    );

    // 分割されたPartition毎にTransactWriteItemsを実行するMap
    const transactWriteItemsMap = new aws_stepfunctions.Map(
      this,
      'transactWriteItemsMap',
      {
        itemsPath: aws_stepfunctions.JsonPath.stringAt('$.partitionedItems'),
      }
    ).iterator(getTransactWriteItemsMap.next(putDataTask));

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'myStateMachine', {
      stateMachineName: 'myStateMachine',
      definition: itemsPartitionPass.next(transactWriteItemsMap),
    });
  }
}

上記をCDK Deployしてスタックをデプロイします。これにより次の定義のState Machineが作成されます。

{
  "StartAt": "itemsPartitionPass",
  "States": {
    "itemsPartitionPass": {
      "Type": "Pass",
      "Parameters": {
        "partitionedItems.$": "States.ArrayPartition($.inputItems,25)"
      },
      "Next": "transactWriteItemsMap"
    },
    "transactWriteItemsMap": {
      "Type": "Map",
      "End": true,
      "Iterator": {
        "StartAt": "getTransactWriteItemsMap",
        "States": {
          "getTransactWriteItemsMap": {
            "Type": "Map",
            "Next": "putDataTask",
            "Iterator": {
              "StartAt": "getTransactWriteItemsRequestTask",
              "States": {
                "getTransactWriteItemsRequestTask": {
                  "Type": "Pass",
                  "Parameters": {
                    "Put": {
                      "TableName": "dataTable",
                      "Item": {
                        "id": {
                          "S.$": "$.id"
                        }
                      }
                    }
                  },
                  "End": true
                }
              }
            },
            "ItemsPath": "$"
          },
          "putDataTask": {
            "End": true,
            "Type": "Task",
            "Resource": "arn:aws:states:::aws-sdk:dynamodb:transactWriteItems",
            "Parameters": {
              "TransactItems.$": "$"
            }
          }
        }
      },
      "ItemsPath": "$.partitionedItems"
    }
  }
}

State Machine Graphは次のようになります。

動作確認

次の30個のアイテムをInputに指定してState Machineを実行してみます。

{
    "inputItems": [
      { "id": "001" },
      { "id": "002" },
      { "id": "003" },
      { "id": "004" },
      { "id": "005" },
      { "id": "006" },
      { "id": "007" },
      { "id": "008" },
      { "id": "009" },
      { "id": "010" },
      { "id": "011" },
      { "id": "012" },
      { "id": "013" },
      { "id": "014" },
      { "id": "015" },
      { "id": "016" },
      { "id": "017" },
      { "id": "018" },
      { "id": "019" },
      { "id": "020" },
      { "id": "021" },
      { "id": "022" },
      { "id": "023" },
      { "id": "024" },
      { "id": "025" },
      { "id": "026" },
      { "id": "027" },
      { "id": "028" },
      { "id": "029" },
      { "id": "030" }      
    ]
}

すると実行が成功しました。itemsPartitionPassのOutputを見ると、アイテムがちゃんと25個ずつに分割されています。

{
  "partitionedItems": [
    [
      {
        "id": "001"
      },
      {
        "id": "002"
      },
      {
        "id": "003"
      },
      {
        "id": "004"
      },
      {
        "id": "005"
      },
      {
        "id": "006"
      },
      {
        "id": "007"
      },
      {
        "id": "008"
      },
      {
        "id": "009"
      },
      {
        "id": "010"
      },
      {
        "id": "011"
      },
      {
        "id": "012"
      },
      {
        "id": "013"
      },
      {
        "id": "014"
      },
      {
        "id": "015"
      },
      {
        "id": "016"
      },
      {
        "id": "017"
      },
      {
        "id": "018"
      },
      {
        "id": "019"
      },
      {
        "id": "020"
      },
      {
        "id": "021"
      },
      {
        "id": "022"
      },
      {
        "id": "023"
      },
      {
        "id": "024"
      },
      {
        "id": "025"
      }
    ],
    [
      {
        "id": "026"
      },
      {
        "id": "027"
      },
      {
        "id": "028"
      },
      {
        "id": "029"
      },
      {
        "id": "030"
      }
    ]
  ]
}

putDataTaskのInputを見ると、transactWriteItemsMap - iteration #0ではID値が001から025までの25個のアイテムのPutItemリクエストとなっています。

transactWriteItemsMap - iteration #1ではID値が026から030までの5個のアイテムのPutItemリクエストとなっています。

テーブルをスキャンするとちゃんと30個のアイテムすべてがPutされていますね!

$ aws dynamodb scan --table-name dataTable --query "sort_by(Items, &id.S)[*].id.S"
[
    "001",
    "002",
    "003",
    "004",
    "005",
    "006",
    "007",
    "008",
    "009",
    "010",
    "011",
    "012",
    "013",
    "014",
    "015",
    "016",
    "017",
    "018",
    "019",
    "020",
    "021",
    "022",
    "023",
    "024",
    "025",
    "026",
    "027",
    "028",
    "029",
    "030"
]

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.