Step FunctionsステートマシンからDynamoDBテーブルに対して26件以上のUpdateItem操作をしたい(AWS CDK)

2021.11.27

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、AWS Step FunctionsステートマシンからAmazon DynamoDBテーブルに対して26件以上のUpdateItem操作を行う構成をAWS CDKで作成してみました。

TransactWriteItemsで一度に更新処理できる件数は最大25件

以前の下記エントリでは、ステートマシンからDynamoDBテーブル上の複数アイテムのUpdateItemをTransactWriteItemsを使って行いました。

しかしこのTransactWriteItemsは一度に更新処理できる件数は最大25件という制限があります。

TransactWriteItems is a synchronous write operation that groups up to 25 action requests.

そこで今回は、ステートマシンから26件以上のUpdateItem操作を、TransactWriteItems操作をMapステートでループ実行する構成で実装してみます。

やってみた

CDKコード

lib/aws-cdk-app-stack.ts

import * as cdk from '@aws-cdk/core';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as dynamodb from '@aws-cdk/aws-dynamodb';

export class AwsCdkAppStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    //UpdateItem対象のDynamoDBテーブル
    const deviceTable = new dynamodb.Table(this, 'deviceTable', {
      tableName: 'deviceTable',
      partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    });

    //transactWriteItemsアクション
    const updateDeviceData = new tasks.CallAwsService(
      this,
      'updateDeviceData',
      {
        service: 'dynamodb',
        action: 'transactWriteItems',
        parameters: {
          'TransactItems.$': '$.transactItems',
        },
        iamResources: [deviceTable.tableArn],
        iamAction: 'dynamodb:*',
      }
    );

    //Mapステート
    const updateDeviceDataMap = new sfn.Map(this, 'updateDeviceDataMap', {
      itemsPath: sfn.JsonPath.stringAt('$.mapItems'),
      parameters: {
        'transactItems.$': '$$.Map.Item.Value.TransactItems',
      },
    });

    updateDeviceDataMap.iterator(updateDeviceData);

    //ステートマシン
    new sfn.StateMachine(this, 'updateDeviceDataStateMachine', {
      stateMachineName: 'updateDeviceDataStateMachine',
      definition: updateDeviceDataMap,
    });
  }
}

cdk deployでデプロイします。

すると下記のような定義のステートマシンが作成されます。

{
  "StartAt": "updateDeviceDataMap",
  "States": {
    "updateDeviceDataMap": {
      "Type": "Map",
      "End": true,
      "Parameters": {
        "transactItems.$": "$$.Map.Item.Value.TransactItems"
      },
      "Iterator": {
        "StartAt": "updateDeviceData",
        "States": {
          "updateDeviceData": {
            "End": true,
            "Type": "Task",
            "Resource": "arn:aws:states:::aws-sdk:dynamodb:transactWriteItems",
            "Parameters": {
              "TransactItems.$": "$.transactItems"
            }
          }
        }
      },
      "ItemsPath": "$.mapItems"
    }
  }
}

動作

以下のようなペイロードを入力にしてステートマシンを実行します。1件目のTransactItemsは25件、2件目のTransactItemsは2件のUpdateItem操作を指定しています。(大きいので折りたたんでいます。)

入力
{
  "mapItems": [
    {
      "TransactItems": [
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d001" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d002" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d003" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス003" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d004" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス004" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d005" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d006" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d007" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス007" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d008" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス008" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d009" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d010" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d011" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス011" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d012" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス012" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d013" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d014" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d015" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス015" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d016" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス016" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },

        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d017" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d018" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d019" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス019" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d020" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス020" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d021" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d022" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d023" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "15" },
              ":deviceName": { "S": "デバイス023" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d024" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス024" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d025" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        }
      ]
    },
    {
      "TransactItems": [
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d026" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "10" }
            },
            "ExpressionAttributeNames": {
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #temperature = :temperature"
          }
        },
        {
          "Update": {
            "TableName": "deviceTable",
            "Key": { "deviceId": { "S": "d027" } },
            "ExpressionAttributeValues": {
              ":temperature": { "N": "30" },
              ":deviceName": { "S": "デバイス027" }
            },
            "ExpressionAttributeNames": {
              "#deviceName": "deviceName",
              "#temperature": "temperature"
            },
            "UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
          }
        }
      ]
    }
  ]
}

テーブルのアイテム一覧を見ると、26件以上のアイテムがUpdateItemできています。

制限

1件のTransactItemsで指定可能な操作件数は25件まで

今回回避しようとした制限です。1件のTransactItemsで26件以上を指定した場合は以下のようなエラーによりMapイテレーションの実行が失敗します。

at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 25 (Service: DynamoDb, Status Code: 400, Request ID: 378b1607-ed9b-4201-a4a3-4932081c51fd, Extended Request ID: null)"

ステートマシンの最大入出力サイズは256KB

ステートマシンのタスク、状態や実行の最大入出力サイズは256KB(262,144バイト)です。

256KBを超えるサイズのデータを扱いたい場合は、S3バケットにデータを一時保存するなどして256KB内に収めるようにします。

ちなみに今回の動作確認で使用した入力データは約14KBでした。

参考

以上