この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部 IoT事業部の若槻です。
今回は、AWS Step FunctionsステートマシンからAmazon DynamoDBテーブルに対して26件以上のUpdateItem操作を行う構成をAWS CDKで作成してみました。
TransactWriteItemsで一度に更新処理できる件数は最大25件
以前の下記エントリでは、ステートマシンからDynamoDBテーブル上の複数アイテムのUpdateItemをTransactWriteItemsを使って行いました。
しかしこのTransactWriteItemsは一度に更新処理できる件数は最大25件という制限があります。
TransactWriteItems is a synchronous write operation that groups up to 25 action requests.
そこで今回は、ステートマシンから26件以上のUpdateItem操作を、TransactWriteItems操作をMapステートでループ実行する構成で実装してみます。
やってみた
CDKコード
lib/aws-cdk-app-stack.ts
import * as cdk from '@aws-cdk/core';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
export class AwsCdkAppStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
//UpdateItem対象のDynamoDBテーブル
const deviceTable = new dynamodb.Table(this, 'deviceTable', {
tableName: 'deviceTable',
partitionKey: { name: 'deviceId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
});
//transactWriteItemsアクション
const updateDeviceData = new tasks.CallAwsService(
this,
'updateDeviceData',
{
service: 'dynamodb',
action: 'transactWriteItems',
parameters: {
'TransactItems.$': '$.transactItems',
},
iamResources: [deviceTable.tableArn],
iamAction: 'dynamodb:*',
}
);
//Mapステート
const updateDeviceDataMap = new sfn.Map(this, 'updateDeviceDataMap', {
itemsPath: sfn.JsonPath.stringAt('$.mapItems'),
parameters: {
'transactItems.$': '$$.Map.Item.Value.TransactItems',
},
});
updateDeviceDataMap.iterator(updateDeviceData);
//ステートマシン
new sfn.StateMachine(this, 'updateDeviceDataStateMachine', {
stateMachineName: 'updateDeviceDataStateMachine',
definition: updateDeviceDataMap,
});
}
}
cdk deploy
でデプロイします。
すると下記のような定義のステートマシンが作成されます。
{
"StartAt": "updateDeviceDataMap",
"States": {
"updateDeviceDataMap": {
"Type": "Map",
"End": true,
"Parameters": {
"transactItems.$": "$$.Map.Item.Value.TransactItems"
},
"Iterator": {
"StartAt": "updateDeviceData",
"States": {
"updateDeviceData": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:transactWriteItems",
"Parameters": {
"TransactItems.$": "$.transactItems"
}
}
}
},
"ItemsPath": "$.mapItems"
}
}
}
動作
以下のようなペイロードを入力にしてステートマシンを実行します。1件目のTransactItemsは25件、2件目のTransactItemsは2件のUpdateItem操作を指定しています。(大きいので折りたたんでいます。)
入力
{
"mapItems": [
{
"TransactItems": [
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d001" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d002" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d003" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス003" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d004" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス004" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d005" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d006" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d007" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス007" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d008" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス008" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d009" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d010" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d011" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス011" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d012" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス012" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d013" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d014" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d015" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス015" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d016" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス016" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d017" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d018" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d019" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス019" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d020" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス020" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d021" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d022" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d023" } },
"ExpressionAttributeValues": {
":temperature": { "N": "15" },
":deviceName": { "S": "デバイス023" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d024" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス024" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d025" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
}
]
},
{
"TransactItems": [
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d026" } },
"ExpressionAttributeValues": {
":temperature": { "N": "10" }
},
"ExpressionAttributeNames": {
"#temperature": "temperature"
},
"UpdateExpression": "SET #temperature = :temperature"
}
},
{
"Update": {
"TableName": "deviceTable",
"Key": { "deviceId": { "S": "d027" } },
"ExpressionAttributeValues": {
":temperature": { "N": "30" },
":deviceName": { "S": "デバイス027" }
},
"ExpressionAttributeNames": {
"#deviceName": "deviceName",
"#temperature": "temperature"
},
"UpdateExpression": "SET #deviceName = :deviceName, #temperature = :temperature"
}
}
]
}
]
}
テーブルのアイテム一覧を見ると、26件以上のアイテムがUpdateItemできています。
制限
1件のTransactItemsで指定可能な操作件数は25件まで
今回回避しようとした制限です。1件のTransactItemsで26件以上を指定した場合は以下のようなエラーによりMapイテレーションの実行が失敗します。
at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 25 (Service: DynamoDb, Status Code: 400, Request ID: 378b1607-ed9b-4201-a4a3-4932081c51fd, Extended Request ID: null)"
ステートマシンの最大入出力サイズは256KB
ステートマシンのタスク、状態や実行の最大入出力サイズは256KB(262,144バイト)です。
256KBを超えるサイズのデータを扱いたい場合は、S3バケットにデータを一時保存するなどして256KB内に収めるようにします。
ちなみに今回の動作確認で使用した入力データは約14KBでした。
参考
以上